diff options
Diffstat (limited to 'FuseArchive')
-rw-r--r-- | FuseArchive/ChunkFile.py | 536 | ||||
-rw-r--r-- | FuseArchive/FileSystem.py | 154 | ||||
-rw-r--r-- | FuseArchive/Serializer.py | 40 | ||||
-rw-r--r-- | FuseArchive/__init__.py | 5 |
4 files changed, 735 insertions, 0 deletions
diff --git a/FuseArchive/ChunkFile.py b/FuseArchive/ChunkFile.py new file mode 100644 index 0000000..546e9c0 --- /dev/null +++ b/FuseArchive/ChunkFile.py @@ -0,0 +1,536 @@ +import logging, os, errno, fcntl, fuse, FuseArchive +from binascii import hexlify +from FuseArchive.Serializer import Serializer + +# These control some of the file output +magic_blocksize = 1024 * 128 +# Use a tiny block size to debug writes, so you can use a smaller test file +#magic_blocksize = 1024 +magic_depth = 5 +gzip_compress_level = 6 +chunkstyle = 'fs' + +# Memory for dirty blocks, per file (1M) +dirty_size = 1024 * 1024 * 1; +# This is the number of actualy blocks in that size +dirty_flush = int( dirty_size / magic_blocksize ) + +# This is a cache of open files by inode, to fix the lseek == size problem +# this causes a failure in fsx-linux becuase to to lseek(fd,0,seek_end) it +# apparently does a getattr to find the file length then subtracts the +# offset from that to pass to write or whatever, since the offset is passed +# to write and we don't maintain one internally. xmp.py also fails this +# test. +dirty_cache = {} + +def flag2mode(flags): + md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'} + m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)] + + if flags & os.O_APPEND: + m = m.replace('w', 'a', 1) + + return m + +class FuseArchiveStat(fuse.Stat): + def __init__(self, stat): + self.st_mode = stat.st_mode + self.st_ino = stat.st_ino + self.st_dev = stat.st_dev + self.st_rdev = stat.st_rdev + self.st_nlink = stat.st_nlink + self.st_uid = stat.st_uid + self.st_gid = stat.st_gid + self.st_size = stat.st_size + self.st_atime = stat.st_atime + self.st_mtime = stat.st_mtime + self.st_ctime = stat.st_mtime + self.st_blocks = stat.st_blocks + self.st_blksize = stat.st_blksize + + def overstat( self, size ): + self.st_size = size + # Yeah we shouldn't always just add 1 + self.st_blocks = int( self.st_size / 512 ) + 1 + +class ChunkFile(object): + + def __init__(self, path, flags, *mode): + # Inflate the file + logging.debug( "Init file: " + path ) + self.orig_path = path + + # init rw and offset + self.rd = False + self.wr = False + self.size = 0 + self.modified = False + + # This is the current in-memory chunk and offset in to data[] + self.chunk_cache = {}; + self.chunk = '' + self.chunk_index = -1 + self.chunk_modified = False + self.chunk_size = magic_blocksize + self.dirty_chunks = 0 + + # The chunk table + self.chunks = [] + + # TODO: Better flag handling here? + if flags & os.O_RDONLY: + self.rd = True + + if flags & os.O_RDWR: + self.rd = True + self.wr = True + + if flags & os.O_WRONLY: + self.wr = True + + if flags & os.O_APPEND: + self.wr = True + + if os.path.exists( "./tree" + self.orig_path ): + preexist = True + else: + preexist = False + + # Open the file now and keep the fh around so that cp -a on r/o + # files works (in the create a read-only file for writing case) + src = "./tree" + path + logging.debug( "Saving fh for " + src ) + nflags = os.O_RDWR | os.O_APPEND + if flags & os.O_CREAT: + logging.debug( "Adding O_CREAT" ) + nflags = nflags | os.O_CREAT + + self.file = os.fdopen( os.open( src, nflags, *mode ), + flag2mode( nflags ) ) + + if preexist: + # Read in file info table + logging.debug( "Unpickling: %s" % self.file ) + # TODO: return an IO error if inflating fails + try: + magic = Serializer.loadfh( self.file ) + logging.debug( "Got data: %s" % magic ) + self.size = magic[ 'size' ] + self.chunks = magic[ 'chunks' ] + self.chunk_size = magic[ 'chunk_size' ] + except Exception, e: + logging.critical( self.orig_path + ": " + str( e ) ) + else: + if self.wr: + logging.debug( "File doesn't exist and we're going to write, creating temp empty file" ) + self.modified = True + self.flush() + + self.direct_io = False + self.keep_cache = False + + logging.debug( "%s init complete" % self ) + + def _load_chunk( self, index ): + # If the current chunk is the same as the chunk we're loading + # just return + logging.debug( "_load_chunk: %d" % index ) + + if index == self.chunk_index: + logging.debug( "Load chunk is same as current chunk, all done" ) + return + + # Save this chunk if modified + self._save_chunk() + + logging.debug( "Loading chunk %d" % index ) + key = None + + size = len( self.chunks ) + if index >= size: + logging.debug( "Index doesn't exist" ) + else: + key = self.chunks[ index ] + + if key: + if isinstance( key, str ): + logging.debug( "Found cached dirty page" ) + self.chunk = key + else: + logging.debug( "Index: %s" % key ) + self.chunk = load_chunk( key ) + else: + logging.debug( "No chunk at this index, loading nothing" ) + self.chunk = '' + + logging.debug( "Loaded chunk of length: %d" % len( self.chunk ) ) + + self.chunk_index = index + self.chunk_modified = False + + # This simply puts the chunk data inside our current chunks at chunk_index + def _save_chunk(self): + if self.chunk_modified: + logging.debug( "Saving chunk %d" % self.chunk_index ) + + # Make sure we have room for this chunk + size = len( self.chunks ) + if self.chunk_index >= size: + self.chunks.extend( [ '' ] * ( self.chunk_index -size + 1 ) ) + + # Increment dirty chunks if we had a key here already + if isinstance( self.chunks[ self.chunk_index ], list ) or \ + len( self.chunks[ self.chunk_index ] ) == 0: + self.dirty_chunks += 1 + logging.debug( "Dirty chunks is now: %d" % self.dirty_chunks ) + logging.debug( "Dirty flush at: %d" % dirty_flush ) + + # Save the dirty chunk temporarily in memory + self.chunks[ self.chunk_index ] = self.chunk + + # Flush if we have too many dirty chunks + if self.dirty_chunks > dirty_flush: + self._flush_chunks() + + # This flushes any cached chunks + def _flush_chunks(self): + for index in range( len( self.chunks ) ): + if isinstance( self.chunks[ index ], str ): + logging.debug( "Flushing chunk at %d" % index ) + key = save_chunk( self.chunks[ index ] ) + self.chunks[ index ] = key + logging.debug( "Key was %s" % key ) + self.dirty_chunks = 0 + + def read(self, length, offset): + logging.debug( "Reading from %s offset: %d (0x%x) length: %d (0x%d)" % + ( self.orig_path, offset, offset, length, length ) ) + + data_read = 0 + data = '' + index = int( offset / self.chunk_size ) + rest = offset % self.chunk_size + is_eof = False + + # Keep reading chunks until we have at least this much data + while data_read < length and not is_eof: + logging.debug( "Pulling chunk data: %d" % index ) + self._load_chunk( index ) + if len(self.chunk): + chunk_remaining = len(self.chunk) - rest + to_read = chunk_remaining + data_left = length - data_read + if data_left < chunk_remaining: + to_read = data_left + + logging.debug( "chunk_remaining: %d" % chunk_remaining ) + logging.debug( "data_left: %d" % data_left ) + logging.debug( "data_read: %d" % data_read ) + logging.debug( "rest: %d" % rest ) + logging.debug( "Copying %d bytes" % to_read ) + + data += self.chunk[ rest:(rest+to_read) ] + data_read += to_read + index += 1 + rest = 0 + else: + logging.debug( "No more chunk data, bye" ) + is_eof = True + + logging.debug( "Returning %d bytes of data" % len( data ) ) + logging.debug( "Internal count was: %d" % data_read ) + return data + + def write(self, buf, offset): + if magic_profiling: + return len( buf ) + + logging.debug( "Writing to %s offset: %d (0x%x) length: %d (0x%x)" % + ( self.orig_path, offset, offset, len( buf ), len( buf ) ) ) + + index = int( offset / self.chunk_size ) + rest = offset % self.chunk_size + + logging.debug( "This chunk falls on index: %d rest: %d" % ( index, rest ) ) + logging.debug( "We have %d chunks" % len( self.chunks ) ) + logging.debug( "File size is: %d" % self.size ) + + # If index is higher than the number of blocks we current have it's a seek hole, so we need to extend our blocks out + # We know these need to essentially be zeroed up to this size since + if len( self.chunks ) - 1 < index: + logging.debug( "Not enough chunks %d, need %d, extending" % + ( len( self.chunks ), index + 1 ) ) + this_index = 0 + while this_index < index: + self._load_chunk( this_index ) + fill_null = self.chunk_size - len(self.chunk) + logging.debug( "Filling this chunk with null, bytes: %d" % fill_null ) + self.chunk += "\0" * fill_null + logging.debug( "Chunk is now: %d bytes" % len( self.chunk) ) + self.chunk_modified = True + self._save_chunk() + this_index += 1 + + self._load_chunk( index ) + + # Now check if this chunk needs to be extended + if len( self.chunk ) < rest: + fill_null = rest - len(self.chunk) + logging.debug( "Filling final chunk with null, bytes: %d" % fill_null ) + self.chunk += "\0" * fill_null + self.chunk_modified = True + self._save_chunk() + + buf_offset = 0 + buf_len = len(buf) + + logging.debug( "Length: %d" % buf_len ) + while( buf_offset < buf_len ): + logging.debug( "Pulling in chunk for writing: %d" % index ) + self._load_chunk( index ) + buf_remain = buf_len - buf_offset + chunk_remain = self.chunk_size - rest + + logging.debug( "buf_remain: %d" % buf_remain ) + logging.debug( "chunk_remain: %d" % chunk_remain ) + + if chunk_remain < buf_remain: + logging.debug( "Writing %d bytes, buffer boundry" % chunk_remain ) + this_len = chunk_remain + else: + logging.debug( "Writing final %d bytes" % buf_remain ) + this_len = buf_remain + + logging.debug( "Bytes to copy: %d" % this_len ) + logging.debug( " buf offset: %d" % buf_offset ) + logging.debug( " chunk offset: %d" % rest ) + + if FuseArchive.deep_debug: + logging.debug( "Pre-Buf: %s" % hexlify(buf) ) + logging.debug( "Pre-Chunk: %s" % hexlify(self.chunk) ) + + # Since python doesn't do in-place reassignment like you + # can with splice() we will reconstruct the data by joining + # stuff by offsets (first chars to skip, then our joining + # buf chunk, the everything that would have been after it) + self.chunk = self.chunk[ :rest ] + \ + buf[ buf_offset:(buf_offset+this_len) ] + \ + self.chunk[ (rest + this_len): ] + + if FuseArchive.deep_debug: + logging.debug( "Post-Buf: %s" % hexlify(buf) ) + logging.debug( "Post-Chunk: %s" % hexlify(self.chunk) ) + + buf_offset += this_len + + # Advance to next block + rest = 0 + index += 1 + self.chunk_modified = True + + self._save_chunk() + self.modified = True + if offset + len(buf) > self.size: + self.size = offset + len(buf) + + logging.debug( "This chunk size is now: %d" % len( self.chunk ) ) + logging.debug( "File size is now: %d" % self.size ) + logging.debug( "Num Chunks: %d" % len( self.chunks ) ) + + # Mark us in the dirty cache + dirty_cache[ self.orig_path ] = self + + return len(buf) + + # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show + # up right? like wrong size and stuff? + # Maybe because release doesn't return a fuse message and is async? + def release(self, flags): + # Deflate the file + logging.debug( "Release: " + self.orig_path ) + self.flush() + self.file.close() + + def _fflush(self): + if self.wr and self.modified: + logging.debug( "_fflush!" ) + # Save our main data + self._save_chunk() + + # And flush any cached chunks + self._flush_chunks() + + save_size = self.size + + # Figure out our size based on the number of chunks + the + # len of the final chunk + numchunks = len( self.chunks ) + if numchunks > 0: + # Load the last chunk + logging.debug( "We have %d chunks, calculating size" % numchunks ) + self._load_chunk( numchunks - 1 ) + self.size = ( numchunks - 1 ) * self.chunk_size + \ + len( self.chunk ) + else: + logging.debug( "No chunks, setting size to zero" ) + self.size = 0 + + # If this assert fails then write/ftruncate failed to set + # things up right somewhere + assert save_size == self.size, "Calculated size of " \ + + self.orig_path + " = " + str( self.size ) \ + + " doesn't match internal size " + str( save_size ) \ + + "\nProbably a bug in write or ftruncate!" + logging.debug( "Size calculated is: %d (0x%x)" % ( self.size, self.size ) ) + + Serializer.dumpfh( self.file, { + 'size': self.size, + 'chunks': self.chunks, + 'chunk_size': self.chunk_size + } ) + + # Not dirty anymore + if self.orig_path in dirty_cache: + del dirty_cache[ self.orig_path ] + + + logging.debug( "_fflush exit" ) + return 1 + + + # Currently we treat fsync as flush since we don't keep any data + # hanging around anyway in fh stuff + def fsync(self, isfsyncfile): + logging.debug( "fsync " + self.orig_path ) + self._fflush() + #if isfsyncfile and hasattr(os, 'fdatasync'): + # os.fdatasync(self.fd) + #else: + # os.fsync(self.fd) + + def flush(self): + logging.debug( "flush " + self.orig_path ) + self._fflush() + + def fgetattr(self): + logging.debug( "Overridding fgetattr" ) + stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) ) + + # Fixed in write? + #if self.modified: + # We would need to fsync here to recalc size, but don't do + # it unless modified? otherwise simple getattr will be + # rewriting a ton of files + # print "WARNING: self.modified causes fgetattr to be incorrect!" + + stats.overstat( self.size ) + return stats + + def ftruncate(self, length): + if not self.wr: + return errno.IOError + + curr_chunks = len( self.chunks ) + need_chunks = ( length / self.chunk_size ) + extra_bytes = length % self.chunk_size + logging.debug( "Ftruncate - %d (0x%x)" % ( length, length ) ) + logging.debug( " - self.size: %d" % self.size ) + logging.debug( " - curr_chunks: %d" % curr_chunks ) + logging.debug( " - need_chunks: %d" % need_chunks ) + logging.debug( " - extra_bytes: %d" % extra_bytes ) + + if extra_bytes: + logging.debug( "Need an extra chunk" ) + need_chunks += 1 + + self._load_chunk( 0 ) + + if length == 0: + logging.debug( "Creating 0 chunk file" ) + self.chunks = [] + self.chunk = '' + elif self.size <= length: + logging.debug( "Need to pad out file, writing/seeking to %d" % length ) + + # Just write out null bytes to the length requested, write will do this for us if we specify the offset + self.write( '', length ) + else: + logging.debug( "Truncating chunks" ) + while True: + logging.debug( "Need chunks: %d curr: %d" % ( need_chunks, curr_chunks ) ) + if need_chunks == curr_chunks: + break + + logging.debug( "Deleting chunk %d" % self.chunk_index ) + self.chunks.pop() + curr_chunks = len( self.chunks ) + + # Now make sure this chunk is the right size, first load the + # last chunk + if len( self.chunks ): + self._load_chunk( len( self.chunks ) - 1 ) + logging.debug( "Loaded final chunk, len: %d" % len( self.chunk ) ) + + # Now truncate this item if needed + if len( self.chunk ) > extra_bytes: + logging.debug( "Truncating final chunk to %d" % extra_bytes ) + self.chunk = self.chunk[ :extra_bytes ] + logging.debug( "Chunk is now: %d bytes" % len( self.chunk ) ) + + self.chunk_modified = True + self.modified = True + self.size = length + self._load_chunk( 0 ) + + logging.debug( "ftruncate complete" ) + self._fflush() + + def lock(self, cmd, owner, **kw): + logging.debug( "WARNING: locking unsupported" ) + return 1 + + # The code here is much rather just a demonstration of the locking + # API than something which actually was seen to be useful. + + # Advisory file locking is pretty messy in Unix, and the Python + # interface to this doesn't make it better. + # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent + # way. The following implementation *might* work under Linux. + # + # if cmd == fcntl.F_GETLK: + # import struct + # + # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET, + # kw['l_start'], kw['l_len'], kw['l_pid']) + # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata) + # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid') + # uld2 = struct.unpack('hhQQi', ld2) + # res = {} + # for i in xrange(len(uld2)): + # res[flockfields[i]] = uld2[i] + # + # return fuse.Flock(**res) + + # Convert fcntl-ish lock parameters to Python's weird + # lockf(3)/flock(2) medley locking API... + op = { fcntl.F_UNLCK : fcntl.LOCK_UN, + fcntl.F_RDLCK : fcntl.LOCK_SH, + fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']] + if cmd == fcntl.F_GETLK: + return -errno.EOPNOTSUPP + elif cmd == fcntl.F_SETLK: + if op != fcntl.LOCK_UN: + op |= fcntl.LOCK_NB + elif cmd == fcntl.F_SETLKW: + pass + else: + return -errno.EINVAL + + fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len']) + + @staticmethod + def get_dirty_file( path ): + if path in dirty_cache: + return( dirty_cache[ path ] ) + else: + return None diff --git a/FuseArchive/FileSystem.py b/FuseArchive/FileSystem.py new file mode 100644 index 0000000..dbdc8a3 --- /dev/null +++ b/FuseArchive/FileSystem.py @@ -0,0 +1,154 @@ +import fuse, os, logging, errno +from ChunkFile import ChunkFile + +if not hasattr(fuse, '__version__'): + raise RuntimeError, \ + "your fuse-py doesn't know of fuse.__version__, probably it's too old." + +fuse.fuse_python_api = (0, 2) +fuse.feature_assert('stateful_files', 'has_init') + +class FileSystem(fuse.Fuse): + + def __init__(self, *args, **kw): + + fuse.Fuse.__init__(self, *args, **kw) + self.root = None + + # Fix getattr and fgetattr to? + def getattr(self, path): + treefile = "./tree" + path + + if os.path.isfile( treefile ): + logging.debug( "Delegating getattr to File for " + path ) + + # Check in the dirty cache first (to handle lseek and the + # relatively broken implmentation in fuse/python) + f = ChunkFile.get_dirty_file( path ) + if f: + logging.info( "WORKAROUND: lseek appears to do a gettattr if whence is SEEK_END, using dirty cache object" ) + stats = f.fgetattr() + # no release, it's still being used + else: + f = ChunkFile( path, os.O_RDONLY, 0 ) + stats = f.fgetattr() + f.release( 0 ) + else: + logging.debug( "Using os.lstat to get stats" ) + stats = os.lstat( treefile ) + + return stats + + def readlink(self, path): + return os.readlink("./tree" + path) + + def readdir(self, path, offset): + for e in os.listdir("./tree" + path): + yield fuse.Direntry(e) + + def unlink(self, path): + os.unlink("./tree" + path) + + def rmdir(self, path): + os.rmdir("./tree" + path) + + def symlink(self, path, path1): + os.symlink(path, "./tree" + path1) + + def rename(self, path, path1): + os.rename("./tree" + path, "./tree" + path1) + + def link(self, path, path1): + os.link("./tree" + path, "./tree" + path1) + + def chmod(self, path, mode): + os.chmod("./tree" + path, mode) + + def chown(self, path, user, group): + os.chown("./tree" + path, user, group) + + def truncate(self, path, len): + # Truncate using the ftruncate on the file + logging.debug( "Using FuseArchiveFile to truncate %s to %d" % ( path, len) ) + f = ChunkFile( path, os.O_RDWR, 0 ) + f.ftruncate(len) + f.release( 0 ) + + def mknod(self, path, mode, dev): + os.mknod("./tree" + path, mode, dev) + + def mkdir(self, path, mode): + os.mkdir("./tree" + path, mode) + + def utime(self, path, times): + os.utime("./tree" + path, times) + +# The following utimens method would do the same as the above utime method. +# We can't make it better though as the Python stdlib doesn't know of +# subsecond preciseness in acces/modify times. +# +# def utimens(self, path, ts_acc, ts_mod): +# os.utime("." + path, (ts_acc.tv_sec, ts_mod.tv_sec)) + + def access(self, path, mode): + if not os.access("./tree" + path, mode): + return -errno.EACCES + +# This is how we could add stub extended attribute handlers... +# (We can't have ones which aptly delegate requests to the underlying fs +# because Python lacks a standard xattr interface.) +# +# def getxattr(self, path, name, size): +# val = name.swapcase() + '@' + path +# if size == 0: +# # We are asked for size of the value. +# return len(val) +# return val +# +# def listxattr(self, path, size): +# # We use the "user" namespace to please XFS utils +# aa = ["user." + a for a in ("foo", "bar")] +# if size == 0: +# # We are asked for size of the attr list, ie. joint size of attrs +# # plus null separators. +# return len("".join(aa)) + len(aa) +# return aa + + def statfs(self): + """ + Should return an object with statvfs attributes (f_bsize, f_frsize...). + Eg., the return value of os.statvfs() is such a thing (since py 2.2). + If you are not reusing an existing statvfs object, start with + fuse.StatVFS(), and define the attributes. + + To provide usable information (ie., you want sensible df(1) + output, you are suggested to specify the following attributes: + + - f_bsize - preferred size of file blocks, in bytes + - f_frsize - fundamental size of file blcoks, in bytes + [if you have no idea, use the same as blocksize] + - f_blocks - total number of blocks in the filesystem + - f_bfree - number of free blocks + - f_files - total number of file inodes + - f_ffree - nunber of free file inodes + """ + + return os.statvfs(".") + + def fsinit(self): + os.chdir(self.root) + + def main(self, *a, **kw): + + self.file_class = ChunkFile + + # This is where fragments go + if not os.path.exists( 'storage' ): + os.mkdir( 'storage' ) + + # This is where the real files exist + if not os.path.exists( 'tree' ): + os.mkdir( 'tree' ) + + return fuse.Fuse.main(self, *a, **kw) + diff --git a/FuseArchive/Serializer.py b/FuseArchive/Serializer.py new file mode 100644 index 0000000..b2e1592 --- /dev/null +++ b/FuseArchive/Serializer.py @@ -0,0 +1,40 @@ +import gzip, FuseArchive + +class Serializer: + """This lets us experiment with different main file serializers""" + @staticmethod + def dump( f, obj ): + out = FuseArchiveStream.open( f, "wb" ) + Serializer.dumpfh( obj, out ) # new file format + out.close() + + @staticmethod + def dumpfh( fh, obj ): + logging.debug( "Going to serialize %s to %s" % ( obj, fh ) ) + fh.truncate( 0 ) + fh.seek( 0 ) + f = gzip.GzipFile( None, "wb", gzip_compress_level, fh ) + #f = fh + cPickle.dump( obj, f, -1 ) + del f + fh.flush() + + @staticmethod + def load( f ): + if FuseArchive.magic_profiling: + return { 'size': 0, 'chunks': 0, 'chunk_size': 0 } + + inp = open( f, "rb" ) + magic = Serializer.loadfh( inp ) + inp.close() + return magic + + @staticmethod + def loadfh( fh ): + logging.debug( "Going to load from %s" % fh ) + fh.seek( 0 ) + f = gzip.GzipFile( None, "rb", gzip_compress_level, fh ) + #f = fh + magic = cPickle.load( f ) + return( magic ) + diff --git a/FuseArchive/__init__.py b/FuseArchive/__init__.py new file mode 100644 index 0000000..5883d80 --- /dev/null +++ b/FuseArchive/__init__.py @@ -0,0 +1,5 @@ +from FileSystem import FileSystem + +magic_profiling = False +deep_debug = False + |