Module arvados.diskcache

Expand source code
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0

import threading
import mmap
import os
import traceback
import stat
import tempfile
import fcntl
import time
import errno
import logging
import weakref

_logger = logging.getLogger('arvados.keep')

cacheblock_suffix = ".keepcacheblock"

class DiskCacheSlot(object):
    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")

    def __init__(self, locator, cachedir):
        self.locator = locator
        self.ready = threading.Event()
        self.content = None
        self.cachedir = cachedir
        self.filehandle = None
        self.linger = None

    def get(self):
        self.ready.wait()
        return self.content

    def set(self, value):
        tmpfile = None
        try:
            if value is None:
                self.content = None
                self.ready.set()
                return

            if len(value) == 0:
                # Can't mmap a 0 length file
                self.content = b''
                self.ready.set()
                return

            if self.content is not None:
                # Has been set already
                self.ready.set()
                return

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            os.makedirs(blockdir, mode=0o700, exist_ok=True)

            final = os.path.join(blockdir, self.locator) + cacheblock_suffix

            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
            tmpfile = self.filehandle.name
            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(self.filehandle, fcntl.LOCK_SH)

            self.filehandle.write(value)
            self.filehandle.flush()
            os.rename(tmpfile, final)
            tmpfile = None

            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            # only set the event when mmap is successful
            self.ready.set()
        finally:
            if tmpfile is not None:
                # If the tempfile hasn't been renamed on disk yet, try to delete it.
                try:
                    os.remove(tmpfile)
                except:
                    pass

    def size(self):
        if self.content is None:
            if self.linger is not None:
                # If it is still lingering (object is still accessible
                # through the weak reference) it is still taking up
                # space.
                content = self.linger()
                if content is not None:
                    return len(content)
            return 0
        else:
            return len(self.content)

    def evict(self):
        if self.content is not None and len(self.content) > 0:
            # The mmap region might be in use when we decided to evict
            # it.  This can happen if the cache is too small.
            #
            # If we call close() now, it'll throw an error if
            # something tries to access it.
            #
            # However, we don't need to explicitly call mmap.close()
            #
            # I confirmed in mmapmodule.c that that both close
            # and deallocate do the same thing:
            #
            # a) close the file descriptor
            # b) unmap the memory range
            #
            # So we can forget it in the cache and delete the file on
            # disk, and it will tear it down after any other
            # lingering Python references to the mapped memory are
            # gone.

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
            try:
                fcntl.flock(self.filehandle, fcntl.LOCK_UN)

                # try to get an exclusive lock, this ensures other
                # processes are not using the block.  It is
                # nonblocking and will throw an exception if we
                # can't get it, which is fine because that means
                # we just won't try to delete it.
                #
                # I should note here, the file locking is not
                # strictly necessary, we could just remove it and
                # the kernel would ensure that the underlying
                # inode remains available as long as other
                # processes still have the file open.  However, if
                # you have multiple processes sharing the cache
                # and deleting each other's files, you'll end up
                # with a bunch of ghost files that don't show up
                # in the file system but are still taking up
                # space, which isn't particularly user friendly.
                # The locking strategy ensures that cache blocks
                # in use remain visible.
                #
                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

                os.remove(final)
                return True
            except OSError:
                pass
            finally:
                self.filehandle = None
                self.linger = weakref.ref(self.content)
                self.content = None
        return False

    def gone(self):
        # Test if an evicted object is lingering
        return self.content is None and (self.linger is None or self.linger() is None)

    @staticmethod
    def get_from_disk(locator, cachedir):
        blockdir = os.path.join(cachedir, locator[0:3])
        final = os.path.join(blockdir, locator) + cacheblock_suffix

        try:
            filehandle = open(final, "rb")

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(filehandle, fcntl.LOCK_SH)

            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            dc = DiskCacheSlot(locator, cachedir)
            dc.filehandle = filehandle
            dc.content = content
            dc.ready.set()
            return dc
        except FileNotFoundError:
            pass
        except Exception as e:
            traceback.print_exc()

        return None

    @staticmethod
    def cache_usage(cachedir):
        usage = 0
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):
                    continue

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)
                usage += res.st_size
        return usage


    @staticmethod
    def init_cache(cachedir, maxslots):
        #
        # First check the disk cache works at all by creating a 1 byte cache entry
        #
        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds.set(b'a')
        if checkexists is None:
            # Don't keep the test entry around unless it existed beforehand.
            ds.evict()

        # map in all the files in the cache directory, up to max slots.
        # after max slots, try to delete the excess blocks.
        #
        # this gives the calling process ownership of all the blocks

        blocks = []
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):
                    continue

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)

                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                    blocks.append((name[0:32], res.st_atime))
                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                    # found a temporary file more than 1 minute old,
                    # try to delete it.
                    try:
                        os.remove(blockpath)
                    except:
                        pass

        # sort by access time (atime), going from most recently
        # accessed (highest timestamp) to least recently accessed
        # (lowest timestamp).
        blocks.sort(key=lambda x: x[1], reverse=True)

        # Map in all the files we found, up to maxslots, if we exceed
        # maxslots, start throwing things out.
        cachelist = []
        for b in blocks:
            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
            if got is None:
                continue
            if len(cachelist) < maxslots:
                cachelist.append(got)
            else:
                # we found more blocks than maxslots, try to
                # throw it out of the cache.
                got.evict()

        return cachelist

Classes

class DiskCacheSlot (locator, cachedir)
Expand source code
class DiskCacheSlot(object):
    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")

    def __init__(self, locator, cachedir):
        self.locator = locator
        self.ready = threading.Event()
        self.content = None
        self.cachedir = cachedir
        self.filehandle = None
        self.linger = None

    def get(self):
        self.ready.wait()
        return self.content

    def set(self, value):
        tmpfile = None
        try:
            if value is None:
                self.content = None
                self.ready.set()
                return

            if len(value) == 0:
                # Can't mmap a 0 length file
                self.content = b''
                self.ready.set()
                return

            if self.content is not None:
                # Has been set already
                self.ready.set()
                return

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            os.makedirs(blockdir, mode=0o700, exist_ok=True)

            final = os.path.join(blockdir, self.locator) + cacheblock_suffix

            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
            tmpfile = self.filehandle.name
            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(self.filehandle, fcntl.LOCK_SH)

            self.filehandle.write(value)
            self.filehandle.flush()
            os.rename(tmpfile, final)
            tmpfile = None

            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            # only set the event when mmap is successful
            self.ready.set()
        finally:
            if tmpfile is not None:
                # If the tempfile hasn't been renamed on disk yet, try to delete it.
                try:
                    os.remove(tmpfile)
                except:
                    pass

    def size(self):
        if self.content is None:
            if self.linger is not None:
                # If it is still lingering (object is still accessible
                # through the weak reference) it is still taking up
                # space.
                content = self.linger()
                if content is not None:
                    return len(content)
            return 0
        else:
            return len(self.content)

    def evict(self):
        if self.content is not None and len(self.content) > 0:
            # The mmap region might be in use when we decided to evict
            # it.  This can happen if the cache is too small.
            #
            # If we call close() now, it'll throw an error if
            # something tries to access it.
            #
            # However, we don't need to explicitly call mmap.close()
            #
            # I confirmed in mmapmodule.c that that both close
            # and deallocate do the same thing:
            #
            # a) close the file descriptor
            # b) unmap the memory range
            #
            # So we can forget it in the cache and delete the file on
            # disk, and it will tear it down after any other
            # lingering Python references to the mapped memory are
            # gone.

            blockdir = os.path.join(self.cachedir, self.locator[0:3])
            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
            try:
                fcntl.flock(self.filehandle, fcntl.LOCK_UN)

                # try to get an exclusive lock, this ensures other
                # processes are not using the block.  It is
                # nonblocking and will throw an exception if we
                # can't get it, which is fine because that means
                # we just won't try to delete it.
                #
                # I should note here, the file locking is not
                # strictly necessary, we could just remove it and
                # the kernel would ensure that the underlying
                # inode remains available as long as other
                # processes still have the file open.  However, if
                # you have multiple processes sharing the cache
                # and deleting each other's files, you'll end up
                # with a bunch of ghost files that don't show up
                # in the file system but are still taking up
                # space, which isn't particularly user friendly.
                # The locking strategy ensures that cache blocks
                # in use remain visible.
                #
                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

                os.remove(final)
                return True
            except OSError:
                pass
            finally:
                self.filehandle = None
                self.linger = weakref.ref(self.content)
                self.content = None
        return False

    def gone(self):
        # Test if an evicted object is lingering
        return self.content is None and (self.linger is None or self.linger() is None)

    @staticmethod
    def get_from_disk(locator, cachedir):
        blockdir = os.path.join(cachedir, locator[0:3])
        final = os.path.join(blockdir, locator) + cacheblock_suffix

        try:
            filehandle = open(final, "rb")

            # aquire a shared lock, this tells other processes that
            # we're using this block and to please not delete it.
            fcntl.flock(filehandle, fcntl.LOCK_SH)

            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
            dc = DiskCacheSlot(locator, cachedir)
            dc.filehandle = filehandle
            dc.content = content
            dc.ready.set()
            return dc
        except FileNotFoundError:
            pass
        except Exception as e:
            traceback.print_exc()

        return None

    @staticmethod
    def cache_usage(cachedir):
        usage = 0
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):
                    continue

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)
                usage += res.st_size
        return usage


    @staticmethod
    def init_cache(cachedir, maxslots):
        #
        # First check the disk cache works at all by creating a 1 byte cache entry
        #
        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
        ds.set(b'a')
        if checkexists is None:
            # Don't keep the test entry around unless it existed beforehand.
            ds.evict()

        # map in all the files in the cache directory, up to max slots.
        # after max slots, try to delete the excess blocks.
        #
        # this gives the calling process ownership of all the blocks

        blocks = []
        for root, dirs, files in os.walk(cachedir):
            for name in files:
                if not name.endswith(cacheblock_suffix):
                    continue

                blockpath = os.path.join(root, name)
                res = os.stat(blockpath)

                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                    blocks.append((name[0:32], res.st_atime))
                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                    # found a temporary file more than 1 minute old,
                    # try to delete it.
                    try:
                        os.remove(blockpath)
                    except:
                        pass

        # sort by access time (atime), going from most recently
        # accessed (highest timestamp) to least recently accessed
        # (lowest timestamp).
        blocks.sort(key=lambda x: x[1], reverse=True)

        # Map in all the files we found, up to maxslots, if we exceed
        # maxslots, start throwing things out.
        cachelist = []
        for b in blocks:
            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
            if got is None:
                continue
            if len(cachelist) < maxslots:
                cachelist.append(got)
            else:
                # we found more blocks than maxslots, try to
                # throw it out of the cache.
                got.evict()

        return cachelist

Static methods

def cache_usage(cachedir)
Expand source code
@staticmethod
def cache_usage(cachedir):
    usage = 0
    for root, dirs, files in os.walk(cachedir):
        for name in files:
            if not name.endswith(cacheblock_suffix):
                continue

            blockpath = os.path.join(root, name)
            res = os.stat(blockpath)
            usage += res.st_size
    return usage
def get_from_disk(locator, cachedir)
Expand source code
@staticmethod
def get_from_disk(locator, cachedir):
    blockdir = os.path.join(cachedir, locator[0:3])
    final = os.path.join(blockdir, locator) + cacheblock_suffix

    try:
        filehandle = open(final, "rb")

        # aquire a shared lock, this tells other processes that
        # we're using this block and to please not delete it.
        fcntl.flock(filehandle, fcntl.LOCK_SH)

        content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
        dc = DiskCacheSlot(locator, cachedir)
        dc.filehandle = filehandle
        dc.content = content
        dc.ready.set()
        return dc
    except FileNotFoundError:
        pass
    except Exception as e:
        traceback.print_exc()

    return None
def init_cache(cachedir, maxslots)
Expand source code
@staticmethod
def init_cache(cachedir, maxslots):
    #
    # First check the disk cache works at all by creating a 1 byte cache entry
    #
    checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
    ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
    ds.set(b'a')
    if checkexists is None:
        # Don't keep the test entry around unless it existed beforehand.
        ds.evict()

    # map in all the files in the cache directory, up to max slots.
    # after max slots, try to delete the excess blocks.
    #
    # this gives the calling process ownership of all the blocks

    blocks = []
    for root, dirs, files in os.walk(cachedir):
        for name in files:
            if not name.endswith(cacheblock_suffix):
                continue

            blockpath = os.path.join(root, name)
            res = os.stat(blockpath)

            if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
                blocks.append((name[0:32], res.st_atime))
            elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
                # found a temporary file more than 1 minute old,
                # try to delete it.
                try:
                    os.remove(blockpath)
                except:
                    pass

    # sort by access time (atime), going from most recently
    # accessed (highest timestamp) to least recently accessed
    # (lowest timestamp).
    blocks.sort(key=lambda x: x[1], reverse=True)

    # Map in all the files we found, up to maxslots, if we exceed
    # maxslots, start throwing things out.
    cachelist = []
    for b in blocks:
        got = DiskCacheSlot.get_from_disk(b[0], cachedir)
        if got is None:
            continue
        if len(cachelist) < maxslots:
            cachelist.append(got)
        else:
            # we found more blocks than maxslots, try to
            # throw it out of the cache.
            got.evict()

    return cachelist

Instance variables

var cachedir

Return an attribute of instance, which is of type owner.

var content

Return an attribute of instance, which is of type owner.

var filehandle

Return an attribute of instance, which is of type owner.

var linger

Return an attribute of instance, which is of type owner.

var locator

Return an attribute of instance, which is of type owner.

var ready

Return an attribute of instance, which is of type owner.

Methods

def evict(self)
Expand source code
def evict(self):
    if self.content is not None and len(self.content) > 0:
        # The mmap region might be in use when we decided to evict
        # it.  This can happen if the cache is too small.
        #
        # If we call close() now, it'll throw an error if
        # something tries to access it.
        #
        # However, we don't need to explicitly call mmap.close()
        #
        # I confirmed in mmapmodule.c that that both close
        # and deallocate do the same thing:
        #
        # a) close the file descriptor
        # b) unmap the memory range
        #
        # So we can forget it in the cache and delete the file on
        # disk, and it will tear it down after any other
        # lingering Python references to the mapped memory are
        # gone.

        blockdir = os.path.join(self.cachedir, self.locator[0:3])
        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
        try:
            fcntl.flock(self.filehandle, fcntl.LOCK_UN)

            # try to get an exclusive lock, this ensures other
            # processes are not using the block.  It is
            # nonblocking and will throw an exception if we
            # can't get it, which is fine because that means
            # we just won't try to delete it.
            #
            # I should note here, the file locking is not
            # strictly necessary, we could just remove it and
            # the kernel would ensure that the underlying
            # inode remains available as long as other
            # processes still have the file open.  However, if
            # you have multiple processes sharing the cache
            # and deleting each other's files, you'll end up
            # with a bunch of ghost files that don't show up
            # in the file system but are still taking up
            # space, which isn't particularly user friendly.
            # The locking strategy ensures that cache blocks
            # in use remain visible.
            #
            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)

            os.remove(final)
            return True
        except OSError:
            pass
        finally:
            self.filehandle = None
            self.linger = weakref.ref(self.content)
            self.content = None
    return False
def get(self)
Expand source code
def get(self):
    self.ready.wait()
    return self.content
def gone(self)
Expand source code
def gone(self):
    # Test if an evicted object is lingering
    return self.content is None and (self.linger is None or self.linger() is None)
def set(self, value)
Expand source code
def set(self, value):
    tmpfile = None
    try:
        if value is None:
            self.content = None
            self.ready.set()
            return

        if len(value) == 0:
            # Can't mmap a 0 length file
            self.content = b''
            self.ready.set()
            return

        if self.content is not None:
            # Has been set already
            self.ready.set()
            return

        blockdir = os.path.join(self.cachedir, self.locator[0:3])
        os.makedirs(blockdir, mode=0o700, exist_ok=True)

        final = os.path.join(blockdir, self.locator) + cacheblock_suffix

        self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
        tmpfile = self.filehandle.name
        os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)

        # aquire a shared lock, this tells other processes that
        # we're using this block and to please not delete it.
        fcntl.flock(self.filehandle, fcntl.LOCK_SH)

        self.filehandle.write(value)
        self.filehandle.flush()
        os.rename(tmpfile, final)
        tmpfile = None

        self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
        # only set the event when mmap is successful
        self.ready.set()
    finally:
        if tmpfile is not None:
            # If the tempfile hasn't been renamed on disk yet, try to delete it.
            try:
                os.remove(tmpfile)
            except:
                pass
def size(self)
Expand source code
def size(self):
    if self.content is None:
        if self.linger is not None:
            # If it is still lingering (object is still accessible
            # through the weak reference) it is still taking up
            # space.
            content = self.linger()
            if content is not None:
                return len(content)
        return 0
    else:
        return len(self.content)