arvados.diskcache

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5import threading
  6import mmap
  7import os
  8import traceback
  9import stat
 10import tempfile
 11import fcntl
 12import time
 13import errno
 14import logging
 15import weakref
 16import collections
 17
 18_logger = logging.getLogger('arvados.keep')
 19
 20cacheblock_suffix = ".keepcacheblock"
 21
 22class DiskCacheSlot(object):
 23    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")
 24
 25    def __init__(self, locator, cachedir):
 26        self.locator = locator
 27        self.ready = threading.Event()
 28        self.content = None
 29        self.cachedir = cachedir
 30        self.filehandle = None
 31        self.linger = None
 32
 33    def get(self):
 34        self.ready.wait()
 35        # 'content' can None, an empty byte string, or a nonempty mmap
 36        # region.  If it is an mmap region, we want to advise the
 37        # kernel we're going to use it.  This nudges the kernel to
 38        # re-read most or all of the block if necessary (instead of
 39        # just a few pages at a time), reducing the number of page
 40        # faults and improving performance by 4x compared to not
 41        # calling madvise.
 42        if self.content:
 43            self.content.madvise(mmap.MADV_WILLNEED)
 44        return self.content
 45
 46    def set(self, value):
 47        tmpfile = None
 48        try:
 49            if value is None:
 50                self.content = None
 51                self.ready.set()
 52                return False
 53
 54            if len(value) == 0:
 55                # Can't mmap a 0 length file
 56                self.content = b''
 57                self.ready.set()
 58                return True
 59
 60            if self.content is not None:
 61                # Has been set already
 62                self.ready.set()
 63                return False
 64
 65            blockdir = os.path.join(self.cachedir, self.locator[0:3])
 66            os.makedirs(blockdir, mode=0o700, exist_ok=True)
 67
 68            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
 69
 70            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
 71            tmpfile = self.filehandle.name
 72            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 73
 74            # aquire a shared lock, this tells other processes that
 75            # we're using this block and to please not delete it.
 76            fcntl.flock(self.filehandle, fcntl.LOCK_SH)
 77
 78            self.filehandle.write(value)
 79            self.filehandle.flush()
 80            os.rename(tmpfile, final)
 81            tmpfile = None
 82
 83            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
 84            # only set the event when mmap is successful
 85            self.ready.set()
 86            return True
 87        finally:
 88            if tmpfile is not None:
 89                # If the tempfile hasn't been renamed on disk yet, try to delete it.
 90                try:
 91                    os.remove(tmpfile)
 92                except:
 93                    pass
 94
 95    def size(self):
 96        if self.content is None:
 97            if self.linger is not None:
 98                # If it is still lingering (object is still accessible
 99                # through the weak reference) it is still taking up
100                # space.
101                content = self.linger()
102                if content is not None:
103                    return len(content)
104            return 0
105        else:
106            return len(self.content)
107
108    def evict(self):
109        if not self.content:
110            return
111
112        # The mmap region might be in use when we decided to evict
113        # it.  This can happen if the cache is too small.
114        #
115        # If we call close() now, it'll throw an error if
116        # something tries to access it.
117        #
118        # However, we don't need to explicitly call mmap.close()
119        #
120        # I confirmed in mmapmodule.c that that both close
121        # and deallocate do the same thing:
122        #
123        # a) close the file descriptor
124        # b) unmap the memory range
125        #
126        # So we can forget it in the cache and delete the file on
127        # disk, and it will tear it down after any other
128        # lingering Python references to the mapped memory are
129        # gone.
130
131        blockdir = os.path.join(self.cachedir, self.locator[0:3])
132        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
133        try:
134            fcntl.flock(self.filehandle, fcntl.LOCK_UN)
135
136            # try to get an exclusive lock, this ensures other
137            # processes are not using the block.  It is
138            # nonblocking and will throw an exception if we
139            # can't get it, which is fine because that means
140            # we just won't try to delete it.
141            #
142            # I should note here, the file locking is not
143            # strictly necessary, we could just remove it and
144            # the kernel would ensure that the underlying
145            # inode remains available as long as other
146            # processes still have the file open.  However, if
147            # you have multiple processes sharing the cache
148            # and deleting each other's files, you'll end up
149            # with a bunch of ghost files that don't show up
150            # in the file system but are still taking up
151            # space, which isn't particularly user friendly.
152            # The locking strategy ensures that cache blocks
153            # in use remain visible.
154            #
155            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
156
157            os.remove(final)
158            return True
159        except OSError:
160            pass
161        finally:
162            self.filehandle = None
163            self.content = None
164
165    @staticmethod
166    def get_from_disk(locator, cachedir):
167        blockdir = os.path.join(cachedir, locator[0:3])
168        final = os.path.join(blockdir, locator) + cacheblock_suffix
169
170        try:
171            filehandle = open(final, "rb")
172
173            # aquire a shared lock, this tells other processes that
174            # we're using this block and to please not delete it.
175            fcntl.flock(filehandle, fcntl.LOCK_SH)
176
177            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
178            dc = DiskCacheSlot(locator, cachedir)
179            dc.filehandle = filehandle
180            dc.content = content
181            dc.ready.set()
182            return dc
183        except FileNotFoundError:
184            pass
185        except Exception as e:
186            traceback.print_exc()
187
188        return None
189
190    @staticmethod
191    def cache_usage(cachedir):
192        usage = 0
193        for root, dirs, files in os.walk(cachedir):
194            for name in files:
195                if not name.endswith(cacheblock_suffix):
196                    continue
197
198                blockpath = os.path.join(root, name)
199                res = os.stat(blockpath)
200                usage += res.st_size
201        return usage
202
203
204    @staticmethod
205    def init_cache(cachedir, maxslots):
206        #
207        # First check the disk cache works at all by creating a 1 byte cache entry
208        #
209        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
210        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
211        ds.set(b'a')
212        if checkexists is None:
213            # Don't keep the test entry around unless it existed beforehand.
214            ds.evict()
215
216        # map in all the files in the cache directory, up to max slots.
217        # after max slots, try to delete the excess blocks.
218        #
219        # this gives the calling process ownership of all the blocks
220
221        blocks = []
222        for root, dirs, files in os.walk(cachedir):
223            for name in files:
224                if not name.endswith(cacheblock_suffix):
225                    continue
226
227                blockpath = os.path.join(root, name)
228                res = os.stat(blockpath)
229
230                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
231                    blocks.append((name[0:32], res.st_atime))
232                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
233                    # found a temporary file more than 1 minute old,
234                    # try to delete it.
235                    try:
236                        os.remove(blockpath)
237                    except:
238                        pass
239
240        # sort by access time (atime), going from most recently
241        # accessed (highest timestamp) to least recently accessed
242        # (lowest timestamp).
243        blocks.sort(key=lambda x: x[1], reverse=True)
244
245        # Map in all the files we found, up to maxslots, if we exceed
246        # maxslots, start throwing things out.
247        cachelist: collections.OrderedDict = collections.OrderedDict()
248        for b in blocks:
249            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
250            if got is None:
251                continue
252            if len(cachelist) < maxslots:
253                cachelist[got.locator] = got
254            else:
255                # we found more blocks than maxslots, try to
256                # throw it out of the cache.
257                got.evict()
258
259        return cachelist
cacheblock_suffix = '.keepcacheblock'
class DiskCacheSlot:
 23class DiskCacheSlot(object):
 24    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")
 25
 26    def __init__(self, locator, cachedir):
 27        self.locator = locator
 28        self.ready = threading.Event()
 29        self.content = None
 30        self.cachedir = cachedir
 31        self.filehandle = None
 32        self.linger = None
 33
 34    def get(self):
 35        self.ready.wait()
 36        # 'content' can None, an empty byte string, or a nonempty mmap
 37        # region.  If it is an mmap region, we want to advise the
 38        # kernel we're going to use it.  This nudges the kernel to
 39        # re-read most or all of the block if necessary (instead of
 40        # just a few pages at a time), reducing the number of page
 41        # faults and improving performance by 4x compared to not
 42        # calling madvise.
 43        if self.content:
 44            self.content.madvise(mmap.MADV_WILLNEED)
 45        return self.content
 46
 47    def set(self, value):
 48        tmpfile = None
 49        try:
 50            if value is None:
 51                self.content = None
 52                self.ready.set()
 53                return False
 54
 55            if len(value) == 0:
 56                # Can't mmap a 0 length file
 57                self.content = b''
 58                self.ready.set()
 59                return True
 60
 61            if self.content is not None:
 62                # Has been set already
 63                self.ready.set()
 64                return False
 65
 66            blockdir = os.path.join(self.cachedir, self.locator[0:3])
 67            os.makedirs(blockdir, mode=0o700, exist_ok=True)
 68
 69            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
 70
 71            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
 72            tmpfile = self.filehandle.name
 73            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 74
 75            # aquire a shared lock, this tells other processes that
 76            # we're using this block and to please not delete it.
 77            fcntl.flock(self.filehandle, fcntl.LOCK_SH)
 78
 79            self.filehandle.write(value)
 80            self.filehandle.flush()
 81            os.rename(tmpfile, final)
 82            tmpfile = None
 83
 84            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
 85            # only set the event when mmap is successful
 86            self.ready.set()
 87            return True
 88        finally:
 89            if tmpfile is not None:
 90                # If the tempfile hasn't been renamed on disk yet, try to delete it.
 91                try:
 92                    os.remove(tmpfile)
 93                except:
 94                    pass
 95
 96    def size(self):
 97        if self.content is None:
 98            if self.linger is not None:
 99                # If it is still lingering (object is still accessible
100                # through the weak reference) it is still taking up
101                # space.
102                content = self.linger()
103                if content is not None:
104                    return len(content)
105            return 0
106        else:
107            return len(self.content)
108
109    def evict(self):
110        if not self.content:
111            return
112
113        # The mmap region might be in use when we decided to evict
114        # it.  This can happen if the cache is too small.
115        #
116        # If we call close() now, it'll throw an error if
117        # something tries to access it.
118        #
119        # However, we don't need to explicitly call mmap.close()
120        #
121        # I confirmed in mmapmodule.c that that both close
122        # and deallocate do the same thing:
123        #
124        # a) close the file descriptor
125        # b) unmap the memory range
126        #
127        # So we can forget it in the cache and delete the file on
128        # disk, and it will tear it down after any other
129        # lingering Python references to the mapped memory are
130        # gone.
131
132        blockdir = os.path.join(self.cachedir, self.locator[0:3])
133        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
134        try:
135            fcntl.flock(self.filehandle, fcntl.LOCK_UN)
136
137            # try to get an exclusive lock, this ensures other
138            # processes are not using the block.  It is
139            # nonblocking and will throw an exception if we
140            # can't get it, which is fine because that means
141            # we just won't try to delete it.
142            #
143            # I should note here, the file locking is not
144            # strictly necessary, we could just remove it and
145            # the kernel would ensure that the underlying
146            # inode remains available as long as other
147            # processes still have the file open.  However, if
148            # you have multiple processes sharing the cache
149            # and deleting each other's files, you'll end up
150            # with a bunch of ghost files that don't show up
151            # in the file system but are still taking up
152            # space, which isn't particularly user friendly.
153            # The locking strategy ensures that cache blocks
154            # in use remain visible.
155            #
156            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
157
158            os.remove(final)
159            return True
160        except OSError:
161            pass
162        finally:
163            self.filehandle = None
164            self.content = None
165
166    @staticmethod
167    def get_from_disk(locator, cachedir):
168        blockdir = os.path.join(cachedir, locator[0:3])
169        final = os.path.join(blockdir, locator) + cacheblock_suffix
170
171        try:
172            filehandle = open(final, "rb")
173
174            # aquire a shared lock, this tells other processes that
175            # we're using this block and to please not delete it.
176            fcntl.flock(filehandle, fcntl.LOCK_SH)
177
178            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
179            dc = DiskCacheSlot(locator, cachedir)
180            dc.filehandle = filehandle
181            dc.content = content
182            dc.ready.set()
183            return dc
184        except FileNotFoundError:
185            pass
186        except Exception as e:
187            traceback.print_exc()
188
189        return None
190
191    @staticmethod
192    def cache_usage(cachedir):
193        usage = 0
194        for root, dirs, files in os.walk(cachedir):
195            for name in files:
196                if not name.endswith(cacheblock_suffix):
197                    continue
198
199                blockpath = os.path.join(root, name)
200                res = os.stat(blockpath)
201                usage += res.st_size
202        return usage
203
204
205    @staticmethod
206    def init_cache(cachedir, maxslots):
207        #
208        # First check the disk cache works at all by creating a 1 byte cache entry
209        #
210        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
211        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
212        ds.set(b'a')
213        if checkexists is None:
214            # Don't keep the test entry around unless it existed beforehand.
215            ds.evict()
216
217        # map in all the files in the cache directory, up to max slots.
218        # after max slots, try to delete the excess blocks.
219        #
220        # this gives the calling process ownership of all the blocks
221
222        blocks = []
223        for root, dirs, files in os.walk(cachedir):
224            for name in files:
225                if not name.endswith(cacheblock_suffix):
226                    continue
227
228                blockpath = os.path.join(root, name)
229                res = os.stat(blockpath)
230
231                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
232                    blocks.append((name[0:32], res.st_atime))
233                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
234                    # found a temporary file more than 1 minute old,
235                    # try to delete it.
236                    try:
237                        os.remove(blockpath)
238                    except:
239                        pass
240
241        # sort by access time (atime), going from most recently
242        # accessed (highest timestamp) to least recently accessed
243        # (lowest timestamp).
244        blocks.sort(key=lambda x: x[1], reverse=True)
245
246        # Map in all the files we found, up to maxslots, if we exceed
247        # maxslots, start throwing things out.
248        cachelist: collections.OrderedDict = collections.OrderedDict()
249        for b in blocks:
250            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
251            if got is None:
252                continue
253            if len(cachelist) < maxslots:
254                cachelist[got.locator] = got
255            else:
256                # we found more blocks than maxslots, try to
257                # throw it out of the cache.
258                got.evict()
259
260        return cachelist
DiskCacheSlot(locator, cachedir)
26    def __init__(self, locator, cachedir):
27        self.locator = locator
28        self.ready = threading.Event()
29        self.content = None
30        self.cachedir = cachedir
31        self.filehandle = None
32        self.linger = None
locator
ready
content
cachedir
filehandle
linger
def get(self):
34    def get(self):
35        self.ready.wait()
36        # 'content' can None, an empty byte string, or a nonempty mmap
37        # region.  If it is an mmap region, we want to advise the
38        # kernel we're going to use it.  This nudges the kernel to
39        # re-read most or all of the block if necessary (instead of
40        # just a few pages at a time), reducing the number of page
41        # faults and improving performance by 4x compared to not
42        # calling madvise.
43        if self.content:
44            self.content.madvise(mmap.MADV_WILLNEED)
45        return self.content
def set(self, value):
47    def set(self, value):
48        tmpfile = None
49        try:
50            if value is None:
51                self.content = None
52                self.ready.set()
53                return False
54
55            if len(value) == 0:
56                # Can't mmap a 0 length file
57                self.content = b''
58                self.ready.set()
59                return True
60
61            if self.content is not None:
62                # Has been set already
63                self.ready.set()
64                return False
65
66            blockdir = os.path.join(self.cachedir, self.locator[0:3])
67            os.makedirs(blockdir, mode=0o700, exist_ok=True)
68
69            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
70
71            self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
72            tmpfile = self.filehandle.name
73            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
74
75            # aquire a shared lock, this tells other processes that
76            # we're using this block and to please not delete it.
77            fcntl.flock(self.filehandle, fcntl.LOCK_SH)
78
79            self.filehandle.write(value)
80            self.filehandle.flush()
81            os.rename(tmpfile, final)
82            tmpfile = None
83
84            self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
85            # only set the event when mmap is successful
86            self.ready.set()
87            return True
88        finally:
89            if tmpfile is not None:
90                # If the tempfile hasn't been renamed on disk yet, try to delete it.
91                try:
92                    os.remove(tmpfile)
93                except:
94                    pass
def size(self):
 96    def size(self):
 97        if self.content is None:
 98            if self.linger is not None:
 99                # If it is still lingering (object is still accessible
100                # through the weak reference) it is still taking up
101                # space.
102                content = self.linger()
103                if content is not None:
104                    return len(content)
105            return 0
106        else:
107            return len(self.content)
def evict(self):
109    def evict(self):
110        if not self.content:
111            return
112
113        # The mmap region might be in use when we decided to evict
114        # it.  This can happen if the cache is too small.
115        #
116        # If we call close() now, it'll throw an error if
117        # something tries to access it.
118        #
119        # However, we don't need to explicitly call mmap.close()
120        #
121        # I confirmed in mmapmodule.c that that both close
122        # and deallocate do the same thing:
123        #
124        # a) close the file descriptor
125        # b) unmap the memory range
126        #
127        # So we can forget it in the cache and delete the file on
128        # disk, and it will tear it down after any other
129        # lingering Python references to the mapped memory are
130        # gone.
131
132        blockdir = os.path.join(self.cachedir, self.locator[0:3])
133        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
134        try:
135            fcntl.flock(self.filehandle, fcntl.LOCK_UN)
136
137            # try to get an exclusive lock, this ensures other
138            # processes are not using the block.  It is
139            # nonblocking and will throw an exception if we
140            # can't get it, which is fine because that means
141            # we just won't try to delete it.
142            #
143            # I should note here, the file locking is not
144            # strictly necessary, we could just remove it and
145            # the kernel would ensure that the underlying
146            # inode remains available as long as other
147            # processes still have the file open.  However, if
148            # you have multiple processes sharing the cache
149            # and deleting each other's files, you'll end up
150            # with a bunch of ghost files that don't show up
151            # in the file system but are still taking up
152            # space, which isn't particularly user friendly.
153            # The locking strategy ensures that cache blocks
154            # in use remain visible.
155            #
156            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
157
158            os.remove(final)
159            return True
160        except OSError:
161            pass
162        finally:
163            self.filehandle = None
164            self.content = None
@staticmethod
def get_from_disk(locator, cachedir):
166    @staticmethod
167    def get_from_disk(locator, cachedir):
168        blockdir = os.path.join(cachedir, locator[0:3])
169        final = os.path.join(blockdir, locator) + cacheblock_suffix
170
171        try:
172            filehandle = open(final, "rb")
173
174            # aquire a shared lock, this tells other processes that
175            # we're using this block and to please not delete it.
176            fcntl.flock(filehandle, fcntl.LOCK_SH)
177
178            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
179            dc = DiskCacheSlot(locator, cachedir)
180            dc.filehandle = filehandle
181            dc.content = content
182            dc.ready.set()
183            return dc
184        except FileNotFoundError:
185            pass
186        except Exception as e:
187            traceback.print_exc()
188
189        return None
@staticmethod
def cache_usage(cachedir):
191    @staticmethod
192    def cache_usage(cachedir):
193        usage = 0
194        for root, dirs, files in os.walk(cachedir):
195            for name in files:
196                if not name.endswith(cacheblock_suffix):
197                    continue
198
199                blockpath = os.path.join(root, name)
200                res = os.stat(blockpath)
201                usage += res.st_size
202        return usage
@staticmethod
def init_cache(cachedir, maxslots):
205    @staticmethod
206    def init_cache(cachedir, maxslots):
207        #
208        # First check the disk cache works at all by creating a 1 byte cache entry
209        #
210        checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir)
211        ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir)
212        ds.set(b'a')
213        if checkexists is None:
214            # Don't keep the test entry around unless it existed beforehand.
215            ds.evict()
216
217        # map in all the files in the cache directory, up to max slots.
218        # after max slots, try to delete the excess blocks.
219        #
220        # this gives the calling process ownership of all the blocks
221
222        blocks = []
223        for root, dirs, files in os.walk(cachedir):
224            for name in files:
225                if not name.endswith(cacheblock_suffix):
226                    continue
227
228                blockpath = os.path.join(root, name)
229                res = os.stat(blockpath)
230
231                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
232                    blocks.append((name[0:32], res.st_atime))
233                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
234                    # found a temporary file more than 1 minute old,
235                    # try to delete it.
236                    try:
237                        os.remove(blockpath)
238                    except:
239                        pass
240
241        # sort by access time (atime), going from most recently
242        # accessed (highest timestamp) to least recently accessed
243        # (lowest timestamp).
244        blocks.sort(key=lambda x: x[1], reverse=True)
245
246        # Map in all the files we found, up to maxslots, if we exceed
247        # maxslots, start throwing things out.
248        cachelist: collections.OrderedDict = collections.OrderedDict()
249        for b in blocks:
250            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
251            if got is None:
252                continue
253            if len(cachelist) < maxslots:
254                cachelist[got.locator] = got
255            else:
256                # we found more blocks than maxslots, try to
257                # throw it out of the cache.
258                got.evict()
259
260        return cachelist