arvados.diskcache
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import threading 6import mmap 7import os 8import traceback 9import stat 10import tempfile 11import fcntl 12import time 13import errno 14import logging 15import weakref 16import collections 17 18_logger = logging.getLogger('arvados.keep') 19 20cacheblock_suffix = ".keepcacheblock" 21 22class DiskCacheSlot(object): 23 __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger") 24 25 def __init__(self, locator, cachedir): 26 self.locator = locator 27 self.ready = threading.Event() 28 self.content = None 29 self.cachedir = cachedir 30 self.filehandle = None 31 self.linger = None 32 33 def get(self): 34 self.ready.wait() 35 # 'content' can None, an empty byte string, or a nonempty mmap 36 # region. If it is an mmap region, we want to advise the 37 # kernel we're going to use it. This nudges the kernel to 38 # re-read most or all of the block if necessary (instead of 39 # just a few pages at a time), reducing the number of page 40 # faults and improving performance by 4x compared to not 41 # calling madvise. 42 if self.content and hasattr(mmap.mmap, 'madvise'): 43 self.content.madvise(mmap.MADV_WILLNEED) 44 return self.content 45 46 def set(self, value): 47 tmpfile = None 48 try: 49 if value is None: 50 self.content = None 51 self.ready.set() 52 return False 53 54 if len(value) == 0: 55 # Can't mmap a 0 length file 56 self.content = b'' 57 self.ready.set() 58 return True 59 60 if self.content is not None: 61 # Has been set already 62 self.ready.set() 63 return False 64 65 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 66 os.makedirs(blockdir, mode=0o700, exist_ok=True) 67 68 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 69 70 self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix) 71 tmpfile = self.filehandle.name 72 os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR) 73 74 # aquire a shared lock, this tells other processes that 75 # we're using this block and to please not delete it. 76 fcntl.flock(self.filehandle, fcntl.LOCK_SH) 77 78 self.filehandle.write(value) 79 self.filehandle.flush() 80 os.rename(tmpfile, final) 81 tmpfile = None 82 83 self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ) 84 # only set the event when mmap is successful 85 self.ready.set() 86 return True 87 finally: 88 if tmpfile is not None: 89 # If the tempfile hasn't been renamed on disk yet, try to delete it. 90 try: 91 os.remove(tmpfile) 92 except: 93 pass 94 95 def size(self): 96 if self.content is None: 97 if self.linger is not None: 98 # If it is still lingering (object is still accessible 99 # through the weak reference) it is still taking up 100 # space. 101 content = self.linger() 102 if content is not None: 103 return len(content) 104 return 0 105 else: 106 return len(self.content) 107 108 def evict(self): 109 if not self.content: 110 return 111 112 # The mmap region might be in use when we decided to evict 113 # it. This can happen if the cache is too small. 114 # 115 # If we call close() now, it'll throw an error if 116 # something tries to access it. 117 # 118 # However, we don't need to explicitly call mmap.close() 119 # 120 # I confirmed in mmapmodule.c that that both close 121 # and deallocate do the same thing: 122 # 123 # a) close the file descriptor 124 # b) unmap the memory range 125 # 126 # So we can forget it in the cache and delete the file on 127 # disk, and it will tear it down after any other 128 # lingering Python references to the mapped memory are 129 # gone. 130 131 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 132 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 133 try: 134 fcntl.flock(self.filehandle, fcntl.LOCK_UN) 135 136 # try to get an exclusive lock, this ensures other 137 # processes are not using the block. It is 138 # nonblocking and will throw an exception if we 139 # can't get it, which is fine because that means 140 # we just won't try to delete it. 141 # 142 # I should note here, the file locking is not 143 # strictly necessary, we could just remove it and 144 # the kernel would ensure that the underlying 145 # inode remains available as long as other 146 # processes still have the file open. However, if 147 # you have multiple processes sharing the cache 148 # and deleting each other's files, you'll end up 149 # with a bunch of ghost files that don't show up 150 # in the file system but are still taking up 151 # space, which isn't particularly user friendly. 152 # The locking strategy ensures that cache blocks 153 # in use remain visible. 154 # 155 fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB) 156 157 os.remove(final) 158 return True 159 except OSError: 160 pass 161 finally: 162 self.filehandle = None 163 self.content = None 164 165 @staticmethod 166 def get_from_disk(locator, cachedir): 167 blockdir = os.path.join(cachedir, locator[0:3]) 168 final = os.path.join(blockdir, locator) + cacheblock_suffix 169 170 try: 171 filehandle = open(final, "rb") 172 173 # aquire a shared lock, this tells other processes that 174 # we're using this block and to please not delete it. 175 fcntl.flock(filehandle, fcntl.LOCK_SH) 176 177 content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ) 178 dc = DiskCacheSlot(locator, cachedir) 179 dc.filehandle = filehandle 180 dc.content = content 181 dc.ready.set() 182 return dc 183 except FileNotFoundError: 184 pass 185 except Exception as e: 186 traceback.print_exc() 187 188 return None 189 190 @staticmethod 191 def cache_usage(cachedir): 192 usage = 0 193 for root, dirs, files in os.walk(cachedir): 194 for name in files: 195 if not name.endswith(cacheblock_suffix): 196 continue 197 198 blockpath = os.path.join(root, name) 199 res = os.stat(blockpath) 200 usage += res.st_size 201 return usage 202 203 204 @staticmethod 205 def init_cache(cachedir, maxslots): 206 # 207 # First check the disk cache works at all by creating a 1 byte cache entry 208 # 209 checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir) 210 ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir) 211 ds.set(b'a') 212 if checkexists is None: 213 # Don't keep the test entry around unless it existed beforehand. 214 ds.evict() 215 216 # map in all the files in the cache directory, up to max slots. 217 # after max slots, try to delete the excess blocks. 218 # 219 # this gives the calling process ownership of all the blocks 220 221 blocks = [] 222 for root, dirs, files in os.walk(cachedir): 223 for name in files: 224 if not name.endswith(cacheblock_suffix): 225 continue 226 227 blockpath = os.path.join(root, name) 228 res = os.stat(blockpath) 229 230 if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"): 231 blocks.append((name[0:32], res.st_atime)) 232 elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60): 233 # found a temporary file more than 1 minute old, 234 # try to delete it. 235 try: 236 os.remove(blockpath) 237 except: 238 pass 239 240 # sort by access time (atime), going from most recently 241 # accessed (highest timestamp) to least recently accessed 242 # (lowest timestamp). 243 blocks.sort(key=lambda x: x[1], reverse=True) 244 245 # Map in all the files we found, up to maxslots, if we exceed 246 # maxslots, start throwing things out. 247 cachelist: collections.OrderedDict = collections.OrderedDict() 248 for b in blocks: 249 got = DiskCacheSlot.get_from_disk(b[0], cachedir) 250 if got is None: 251 continue 252 if len(cachelist) < maxslots: 253 cachelist[got.locator] = got 254 else: 255 # we found more blocks than maxslots, try to 256 # throw it out of the cache. 257 got.evict() 258 259 return cachelist
cacheblock_suffix =
'.keepcacheblock'
class
DiskCacheSlot:
23class DiskCacheSlot(object): 24 __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger") 25 26 def __init__(self, locator, cachedir): 27 self.locator = locator 28 self.ready = threading.Event() 29 self.content = None 30 self.cachedir = cachedir 31 self.filehandle = None 32 self.linger = None 33 34 def get(self): 35 self.ready.wait() 36 # 'content' can None, an empty byte string, or a nonempty mmap 37 # region. If it is an mmap region, we want to advise the 38 # kernel we're going to use it. This nudges the kernel to 39 # re-read most or all of the block if necessary (instead of 40 # just a few pages at a time), reducing the number of page 41 # faults and improving performance by 4x compared to not 42 # calling madvise. 43 if self.content and hasattr(mmap.mmap, 'madvise'): 44 self.content.madvise(mmap.MADV_WILLNEED) 45 return self.content 46 47 def set(self, value): 48 tmpfile = None 49 try: 50 if value is None: 51 self.content = None 52 self.ready.set() 53 return False 54 55 if len(value) == 0: 56 # Can't mmap a 0 length file 57 self.content = b'' 58 self.ready.set() 59 return True 60 61 if self.content is not None: 62 # Has been set already 63 self.ready.set() 64 return False 65 66 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 67 os.makedirs(blockdir, mode=0o700, exist_ok=True) 68 69 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 70 71 self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix) 72 tmpfile = self.filehandle.name 73 os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR) 74 75 # aquire a shared lock, this tells other processes that 76 # we're using this block and to please not delete it. 77 fcntl.flock(self.filehandle, fcntl.LOCK_SH) 78 79 self.filehandle.write(value) 80 self.filehandle.flush() 81 os.rename(tmpfile, final) 82 tmpfile = None 83 84 self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ) 85 # only set the event when mmap is successful 86 self.ready.set() 87 return True 88 finally: 89 if tmpfile is not None: 90 # If the tempfile hasn't been renamed on disk yet, try to delete it. 91 try: 92 os.remove(tmpfile) 93 except: 94 pass 95 96 def size(self): 97 if self.content is None: 98 if self.linger is not None: 99 # If it is still lingering (object is still accessible 100 # through the weak reference) it is still taking up 101 # space. 102 content = self.linger() 103 if content is not None: 104 return len(content) 105 return 0 106 else: 107 return len(self.content) 108 109 def evict(self): 110 if not self.content: 111 return 112 113 # The mmap region might be in use when we decided to evict 114 # it. This can happen if the cache is too small. 115 # 116 # If we call close() now, it'll throw an error if 117 # something tries to access it. 118 # 119 # However, we don't need to explicitly call mmap.close() 120 # 121 # I confirmed in mmapmodule.c that that both close 122 # and deallocate do the same thing: 123 # 124 # a) close the file descriptor 125 # b) unmap the memory range 126 # 127 # So we can forget it in the cache and delete the file on 128 # disk, and it will tear it down after any other 129 # lingering Python references to the mapped memory are 130 # gone. 131 132 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 133 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 134 try: 135 fcntl.flock(self.filehandle, fcntl.LOCK_UN) 136 137 # try to get an exclusive lock, this ensures other 138 # processes are not using the block. It is 139 # nonblocking and will throw an exception if we 140 # can't get it, which is fine because that means 141 # we just won't try to delete it. 142 # 143 # I should note here, the file locking is not 144 # strictly necessary, we could just remove it and 145 # the kernel would ensure that the underlying 146 # inode remains available as long as other 147 # processes still have the file open. However, if 148 # you have multiple processes sharing the cache 149 # and deleting each other's files, you'll end up 150 # with a bunch of ghost files that don't show up 151 # in the file system but are still taking up 152 # space, which isn't particularly user friendly. 153 # The locking strategy ensures that cache blocks 154 # in use remain visible. 155 # 156 fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB) 157 158 os.remove(final) 159 return True 160 except OSError: 161 pass 162 finally: 163 self.filehandle = None 164 self.content = None 165 166 @staticmethod 167 def get_from_disk(locator, cachedir): 168 blockdir = os.path.join(cachedir, locator[0:3]) 169 final = os.path.join(blockdir, locator) + cacheblock_suffix 170 171 try: 172 filehandle = open(final, "rb") 173 174 # aquire a shared lock, this tells other processes that 175 # we're using this block and to please not delete it. 176 fcntl.flock(filehandle, fcntl.LOCK_SH) 177 178 content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ) 179 dc = DiskCacheSlot(locator, cachedir) 180 dc.filehandle = filehandle 181 dc.content = content 182 dc.ready.set() 183 return dc 184 except FileNotFoundError: 185 pass 186 except Exception as e: 187 traceback.print_exc() 188 189 return None 190 191 @staticmethod 192 def cache_usage(cachedir): 193 usage = 0 194 for root, dirs, files in os.walk(cachedir): 195 for name in files: 196 if not name.endswith(cacheblock_suffix): 197 continue 198 199 blockpath = os.path.join(root, name) 200 res = os.stat(blockpath) 201 usage += res.st_size 202 return usage 203 204 205 @staticmethod 206 def init_cache(cachedir, maxslots): 207 # 208 # First check the disk cache works at all by creating a 1 byte cache entry 209 # 210 checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir) 211 ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir) 212 ds.set(b'a') 213 if checkexists is None: 214 # Don't keep the test entry around unless it existed beforehand. 215 ds.evict() 216 217 # map in all the files in the cache directory, up to max slots. 218 # after max slots, try to delete the excess blocks. 219 # 220 # this gives the calling process ownership of all the blocks 221 222 blocks = [] 223 for root, dirs, files in os.walk(cachedir): 224 for name in files: 225 if not name.endswith(cacheblock_suffix): 226 continue 227 228 blockpath = os.path.join(root, name) 229 res = os.stat(blockpath) 230 231 if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"): 232 blocks.append((name[0:32], res.st_atime)) 233 elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60): 234 # found a temporary file more than 1 minute old, 235 # try to delete it. 236 try: 237 os.remove(blockpath) 238 except: 239 pass 240 241 # sort by access time (atime), going from most recently 242 # accessed (highest timestamp) to least recently accessed 243 # (lowest timestamp). 244 blocks.sort(key=lambda x: x[1], reverse=True) 245 246 # Map in all the files we found, up to maxslots, if we exceed 247 # maxslots, start throwing things out. 248 cachelist: collections.OrderedDict = collections.OrderedDict() 249 for b in blocks: 250 got = DiskCacheSlot.get_from_disk(b[0], cachedir) 251 if got is None: 252 continue 253 if len(cachelist) < maxslots: 254 cachelist[got.locator] = got 255 else: 256 # we found more blocks than maxslots, try to 257 # throw it out of the cache. 258 got.evict() 259 260 return cachelist
def
get(self):
34 def get(self): 35 self.ready.wait() 36 # 'content' can None, an empty byte string, or a nonempty mmap 37 # region. If it is an mmap region, we want to advise the 38 # kernel we're going to use it. This nudges the kernel to 39 # re-read most or all of the block if necessary (instead of 40 # just a few pages at a time), reducing the number of page 41 # faults and improving performance by 4x compared to not 42 # calling madvise. 43 if self.content and hasattr(mmap.mmap, 'madvise'): 44 self.content.madvise(mmap.MADV_WILLNEED) 45 return self.content
def
set(self, value):
47 def set(self, value): 48 tmpfile = None 49 try: 50 if value is None: 51 self.content = None 52 self.ready.set() 53 return False 54 55 if len(value) == 0: 56 # Can't mmap a 0 length file 57 self.content = b'' 58 self.ready.set() 59 return True 60 61 if self.content is not None: 62 # Has been set already 63 self.ready.set() 64 return False 65 66 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 67 os.makedirs(blockdir, mode=0o700, exist_ok=True) 68 69 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 70 71 self.filehandle = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix) 72 tmpfile = self.filehandle.name 73 os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR) 74 75 # aquire a shared lock, this tells other processes that 76 # we're using this block and to please not delete it. 77 fcntl.flock(self.filehandle, fcntl.LOCK_SH) 78 79 self.filehandle.write(value) 80 self.filehandle.flush() 81 os.rename(tmpfile, final) 82 tmpfile = None 83 84 self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ) 85 # only set the event when mmap is successful 86 self.ready.set() 87 return True 88 finally: 89 if tmpfile is not None: 90 # If the tempfile hasn't been renamed on disk yet, try to delete it. 91 try: 92 os.remove(tmpfile) 93 except: 94 pass
def
size(self):
96 def size(self): 97 if self.content is None: 98 if self.linger is not None: 99 # If it is still lingering (object is still accessible 100 # through the weak reference) it is still taking up 101 # space. 102 content = self.linger() 103 if content is not None: 104 return len(content) 105 return 0 106 else: 107 return len(self.content)
def
evict(self):
109 def evict(self): 110 if not self.content: 111 return 112 113 # The mmap region might be in use when we decided to evict 114 # it. This can happen if the cache is too small. 115 # 116 # If we call close() now, it'll throw an error if 117 # something tries to access it. 118 # 119 # However, we don't need to explicitly call mmap.close() 120 # 121 # I confirmed in mmapmodule.c that that both close 122 # and deallocate do the same thing: 123 # 124 # a) close the file descriptor 125 # b) unmap the memory range 126 # 127 # So we can forget it in the cache and delete the file on 128 # disk, and it will tear it down after any other 129 # lingering Python references to the mapped memory are 130 # gone. 131 132 blockdir = os.path.join(self.cachedir, self.locator[0:3]) 133 final = os.path.join(blockdir, self.locator) + cacheblock_suffix 134 try: 135 fcntl.flock(self.filehandle, fcntl.LOCK_UN) 136 137 # try to get an exclusive lock, this ensures other 138 # processes are not using the block. It is 139 # nonblocking and will throw an exception if we 140 # can't get it, which is fine because that means 141 # we just won't try to delete it. 142 # 143 # I should note here, the file locking is not 144 # strictly necessary, we could just remove it and 145 # the kernel would ensure that the underlying 146 # inode remains available as long as other 147 # processes still have the file open. However, if 148 # you have multiple processes sharing the cache 149 # and deleting each other's files, you'll end up 150 # with a bunch of ghost files that don't show up 151 # in the file system but are still taking up 152 # space, which isn't particularly user friendly. 153 # The locking strategy ensures that cache blocks 154 # in use remain visible. 155 # 156 fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB) 157 158 os.remove(final) 159 return True 160 except OSError: 161 pass 162 finally: 163 self.filehandle = None 164 self.content = None
@staticmethod
def
get_from_disk(locator, cachedir):
166 @staticmethod 167 def get_from_disk(locator, cachedir): 168 blockdir = os.path.join(cachedir, locator[0:3]) 169 final = os.path.join(blockdir, locator) + cacheblock_suffix 170 171 try: 172 filehandle = open(final, "rb") 173 174 # aquire a shared lock, this tells other processes that 175 # we're using this block and to please not delete it. 176 fcntl.flock(filehandle, fcntl.LOCK_SH) 177 178 content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ) 179 dc = DiskCacheSlot(locator, cachedir) 180 dc.filehandle = filehandle 181 dc.content = content 182 dc.ready.set() 183 return dc 184 except FileNotFoundError: 185 pass 186 except Exception as e: 187 traceback.print_exc() 188 189 return None
@staticmethod
def
cache_usage(cachedir):
191 @staticmethod 192 def cache_usage(cachedir): 193 usage = 0 194 for root, dirs, files in os.walk(cachedir): 195 for name in files: 196 if not name.endswith(cacheblock_suffix): 197 continue 198 199 blockpath = os.path.join(root, name) 200 res = os.stat(blockpath) 201 usage += res.st_size 202 return usage
@staticmethod
def
init_cache(cachedir, maxslots):
205 @staticmethod 206 def init_cache(cachedir, maxslots): 207 # 208 # First check the disk cache works at all by creating a 1 byte cache entry 209 # 210 checkexists = DiskCacheSlot.get_from_disk('0cc175b9c0f1b6a831c399e269772661', cachedir) 211 ds = DiskCacheSlot('0cc175b9c0f1b6a831c399e269772661', cachedir) 212 ds.set(b'a') 213 if checkexists is None: 214 # Don't keep the test entry around unless it existed beforehand. 215 ds.evict() 216 217 # map in all the files in the cache directory, up to max slots. 218 # after max slots, try to delete the excess blocks. 219 # 220 # this gives the calling process ownership of all the blocks 221 222 blocks = [] 223 for root, dirs, files in os.walk(cachedir): 224 for name in files: 225 if not name.endswith(cacheblock_suffix): 226 continue 227 228 blockpath = os.path.join(root, name) 229 res = os.stat(blockpath) 230 231 if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"): 232 blocks.append((name[0:32], res.st_atime)) 233 elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60): 234 # found a temporary file more than 1 minute old, 235 # try to delete it. 236 try: 237 os.remove(blockpath) 238 except: 239 pass 240 241 # sort by access time (atime), going from most recently 242 # accessed (highest timestamp) to least recently accessed 243 # (lowest timestamp). 244 blocks.sort(key=lambda x: x[1], reverse=True) 245 246 # Map in all the files we found, up to maxslots, if we exceed 247 # maxslots, start throwing things out. 248 cachelist: collections.OrderedDict = collections.OrderedDict() 249 for b in blocks: 250 got = DiskCacheSlot.get_from_disk(b[0], cachedir) 251 if got is None: 252 continue 253 if len(cachelist) < maxslots: 254 cachelist[got.locator] = got 255 else: 256 # we found more blocks than maxslots, try to 257 # throw it out of the cache. 258 got.evict() 259 260 return cachelist