From 8f02874e8e45d21a90f3c880eaf1aafc3e852951 Mon Sep 17 00:00:00 2001 From: Steve Slaven Date: Mon, 3 Aug 2009 21:47:05 -0700 Subject: Split up main classes diff --git a/FuseArchive/ChunkFile.py b/FuseArchive/ChunkFile.py new file mode 100644 index 0000000..546e9c0 --- /dev/null +++ b/FuseArchive/ChunkFile.py @@ -0,0 +1,536 @@ +import logging, os, errno, fcntl, fuse, FuseArchive +from binascii import hexlify +from FuseArchive.Serializer import Serializer + +# These control some of the file output +magic_blocksize = 1024 * 128 +# Use a tiny block size to debug writes, so you can use a smaller test file +#magic_blocksize = 1024 +magic_depth = 5 +gzip_compress_level = 6 +chunkstyle = 'fs' + +# Memory for dirty blocks, per file (1M) +dirty_size = 1024 * 1024 * 1; +# This is the number of actualy blocks in that size +dirty_flush = int( dirty_size / magic_blocksize ) + +# This is a cache of open files by inode, to fix the lseek == size problem +# this causes a failure in fsx-linux becuase to to lseek(fd,0,seek_end) it +# apparently does a getattr to find the file length then subtracts the +# offset from that to pass to write or whatever, since the offset is passed +# to write and we don't maintain one internally. xmp.py also fails this +# test. +dirty_cache = {} + +def flag2mode(flags): + md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'} + m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)] + + if flags & os.O_APPEND: + m = m.replace('w', 'a', 1) + + return m + +class FuseArchiveStat(fuse.Stat): + def __init__(self, stat): + self.st_mode = stat.st_mode + self.st_ino = stat.st_ino + self.st_dev = stat.st_dev + self.st_rdev = stat.st_rdev + self.st_nlink = stat.st_nlink + self.st_uid = stat.st_uid + self.st_gid = stat.st_gid + self.st_size = stat.st_size + self.st_atime = stat.st_atime + self.st_mtime = stat.st_mtime + self.st_ctime = stat.st_mtime + self.st_blocks = stat.st_blocks + self.st_blksize = stat.st_blksize + + def overstat( self, size ): + self.st_size = size + # Yeah we shouldn't always just add 1 + self.st_blocks = int( self.st_size / 512 ) + 1 + +class ChunkFile(object): + + def __init__(self, path, flags, *mode): + # Inflate the file + logging.debug( "Init file: " + path ) + self.orig_path = path + + # init rw and offset + self.rd = False + self.wr = False + self.size = 0 + self.modified = False + + # This is the current in-memory chunk and offset in to data[] + self.chunk_cache = {}; + self.chunk = '' + self.chunk_index = -1 + self.chunk_modified = False + self.chunk_size = magic_blocksize + self.dirty_chunks = 0 + + # The chunk table + self.chunks = [] + + # TODO: Better flag handling here? + if flags & os.O_RDONLY: + self.rd = True + + if flags & os.O_RDWR: + self.rd = True + self.wr = True + + if flags & os.O_WRONLY: + self.wr = True + + if flags & os.O_APPEND: + self.wr = True + + if os.path.exists( "./tree" + self.orig_path ): + preexist = True + else: + preexist = False + + # Open the file now and keep the fh around so that cp -a on r/o + # files works (in the create a read-only file for writing case) + src = "./tree" + path + logging.debug( "Saving fh for " + src ) + nflags = os.O_RDWR | os.O_APPEND + if flags & os.O_CREAT: + logging.debug( "Adding O_CREAT" ) + nflags = nflags | os.O_CREAT + + self.file = os.fdopen( os.open( src, nflags, *mode ), + flag2mode( nflags ) ) + + if preexist: + # Read in file info table + logging.debug( "Unpickling: %s" % self.file ) + # TODO: return an IO error if inflating fails + try: + magic = Serializer.loadfh( self.file ) + logging.debug( "Got data: %s" % magic ) + self.size = magic[ 'size' ] + self.chunks = magic[ 'chunks' ] + self.chunk_size = magic[ 'chunk_size' ] + except Exception, e: + logging.critical( self.orig_path + ": " + str( e ) ) + else: + if self.wr: + logging.debug( "File doesn't exist and we're going to write, creating temp empty file" ) + self.modified = True + self.flush() + + self.direct_io = False + self.keep_cache = False + + logging.debug( "%s init complete" % self ) + + def _load_chunk( self, index ): + # If the current chunk is the same as the chunk we're loading + # just return + logging.debug( "_load_chunk: %d" % index ) + + if index == self.chunk_index: + logging.debug( "Load chunk is same as current chunk, all done" ) + return + + # Save this chunk if modified + self._save_chunk() + + logging.debug( "Loading chunk %d" % index ) + key = None + + size = len( self.chunks ) + if index >= size: + logging.debug( "Index doesn't exist" ) + else: + key = self.chunks[ index ] + + if key: + if isinstance( key, str ): + logging.debug( "Found cached dirty page" ) + self.chunk = key + else: + logging.debug( "Index: %s" % key ) + self.chunk = load_chunk( key ) + else: + logging.debug( "No chunk at this index, loading nothing" ) + self.chunk = '' + + logging.debug( "Loaded chunk of length: %d" % len( self.chunk ) ) + + self.chunk_index = index + self.chunk_modified = False + + # This simply puts the chunk data inside our current chunks at chunk_index + def _save_chunk(self): + if self.chunk_modified: + logging.debug( "Saving chunk %d" % self.chunk_index ) + + # Make sure we have room for this chunk + size = len( self.chunks ) + if self.chunk_index >= size: + self.chunks.extend( [ '' ] * ( self.chunk_index -size + 1 ) ) + + # Increment dirty chunks if we had a key here already + if isinstance( self.chunks[ self.chunk_index ], list ) or \ + len( self.chunks[ self.chunk_index ] ) == 0: + self.dirty_chunks += 1 + logging.debug( "Dirty chunks is now: %d" % self.dirty_chunks ) + logging.debug( "Dirty flush at: %d" % dirty_flush ) + + # Save the dirty chunk temporarily in memory + self.chunks[ self.chunk_index ] = self.chunk + + # Flush if we have too many dirty chunks + if self.dirty_chunks > dirty_flush: + self._flush_chunks() + + # This flushes any cached chunks + def _flush_chunks(self): + for index in range( len( self.chunks ) ): + if isinstance( self.chunks[ index ], str ): + logging.debug( "Flushing chunk at %d" % index ) + key = save_chunk( self.chunks[ index ] ) + self.chunks[ index ] = key + logging.debug( "Key was %s" % key ) + self.dirty_chunks = 0 + + def read(self, length, offset): + logging.debug( "Reading from %s offset: %d (0x%x) length: %d (0x%d)" % + ( self.orig_path, offset, offset, length, length ) ) + + data_read = 0 + data = '' + index = int( offset / self.chunk_size ) + rest = offset % self.chunk_size + is_eof = False + + # Keep reading chunks until we have at least this much data + while data_read < length and not is_eof: + logging.debug( "Pulling chunk data: %d" % index ) + self._load_chunk( index ) + if len(self.chunk): + chunk_remaining = len(self.chunk) - rest + to_read = chunk_remaining + data_left = length - data_read + if data_left < chunk_remaining: + to_read = data_left + + logging.debug( "chunk_remaining: %d" % chunk_remaining ) + logging.debug( "data_left: %d" % data_left ) + logging.debug( "data_read: %d" % data_read ) + logging.debug( "rest: %d" % rest ) + logging.debug( "Copying %d bytes" % to_read ) + + data += self.chunk[ rest:(rest+to_read) ] + data_read += to_read + index += 1 + rest = 0 + else: + logging.debug( "No more chunk data, bye" ) + is_eof = True + + logging.debug( "Returning %d bytes of data" % len( data ) ) + logging.debug( "Internal count was: %d" % data_read ) + return data + + def write(self, buf, offset): + if magic_profiling: + return len( buf ) + + logging.debug( "Writing to %s offset: %d (0x%x) length: %d (0x%x)" % + ( self.orig_path, offset, offset, len( buf ), len( buf ) ) ) + + index = int( offset / self.chunk_size ) + rest = offset % self.chunk_size + + logging.debug( "This chunk falls on index: %d rest: %d" % ( index, rest ) ) + logging.debug( "We have %d chunks" % len( self.chunks ) ) + logging.debug( "File size is: %d" % self.size ) + + # If index is higher than the number of blocks we current have it's a seek hole, so we need to extend our blocks out + # We know these need to essentially be zeroed up to this size since + if len( self.chunks ) - 1 < index: + logging.debug( "Not enough chunks %d, need %d, extending" % + ( len( self.chunks ), index + 1 ) ) + this_index = 0 + while this_index < index: + self._load_chunk( this_index ) + fill_null = self.chunk_size - len(self.chunk) + logging.debug( "Filling this chunk with null, bytes: %d" % fill_null ) + self.chunk += "\0" * fill_null + logging.debug( "Chunk is now: %d bytes" % len( self.chunk) ) + self.chunk_modified = True + self._save_chunk() + this_index += 1 + + self._load_chunk( index ) + + # Now check if this chunk needs to be extended + if len( self.chunk ) < rest: + fill_null = rest - len(self.chunk) + logging.debug( "Filling final chunk with null, bytes: %d" % fill_null ) + self.chunk += "\0" * fill_null + self.chunk_modified = True + self._save_chunk() + + buf_offset = 0 + buf_len = len(buf) + + logging.debug( "Length: %d" % buf_len ) + while( buf_offset < buf_len ): + logging.debug( "Pulling in chunk for writing: %d" % index ) + self._load_chunk( index ) + buf_remain = buf_len - buf_offset + chunk_remain = self.chunk_size - rest + + logging.debug( "buf_remain: %d" % buf_remain ) + logging.debug( "chunk_remain: %d" % chunk_remain ) + + if chunk_remain < buf_remain: + logging.debug( "Writing %d bytes, buffer boundry" % chunk_remain ) + this_len = chunk_remain + else: + logging.debug( "Writing final %d bytes" % buf_remain ) + this_len = buf_remain + + logging.debug( "Bytes to copy: %d" % this_len ) + logging.debug( " buf offset: %d" % buf_offset ) + logging.debug( " chunk offset: %d" % rest ) + + if FuseArchive.deep_debug: + logging.debug( "Pre-Buf: %s" % hexlify(buf) ) + logging.debug( "Pre-Chunk: %s" % hexlify(self.chunk) ) + + # Since python doesn't do in-place reassignment like you + # can with splice() we will reconstruct the data by joining + # stuff by offsets (first chars to skip, then our joining + # buf chunk, the everything that would have been after it) + self.chunk = self.chunk[ :rest ] + \ + buf[ buf_offset:(buf_offset+this_len) ] + \ + self.chunk[ (rest + this_len): ] + + if FuseArchive.deep_debug: + logging.debug( "Post-Buf: %s" % hexlify(buf) ) + logging.debug( "Post-Chunk: %s" % hexlify(self.chunk) ) + + buf_offset += this_len + + # Advance to next block + rest = 0 + index += 1 + self.chunk_modified = True + + self._save_chunk() + self.modified = True + if offset + len(buf) > self.size: + self.size = offset + len(buf) + + logging.debug( "This chunk size is now: %d" % len( self.chunk ) ) + logging.debug( "File size is now: %d" % self.size ) + logging.debug( "Num Chunks: %d" % len( self.chunks ) ) + + # Mark us in the dirty cache + dirty_cache[ self.orig_path ] = self + + return len(buf) + + # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show + # up right? like wrong size and stuff? + # Maybe because release doesn't return a fuse message and is async? + def release(self, flags): + # Deflate the file + logging.debug( "Release: " + self.orig_path ) + self.flush() + self.file.close() + + def _fflush(self): + if self.wr and self.modified: + logging.debug( "_fflush!" ) + # Save our main data + self._save_chunk() + + # And flush any cached chunks + self._flush_chunks() + + save_size = self.size + + # Figure out our size based on the number of chunks + the + # len of the final chunk + numchunks = len( self.chunks ) + if numchunks > 0: + # Load the last chunk + logging.debug( "We have %d chunks, calculating size" % numchunks ) + self._load_chunk( numchunks - 1 ) + self.size = ( numchunks - 1 ) * self.chunk_size + \ + len( self.chunk ) + else: + logging.debug( "No chunks, setting size to zero" ) + self.size = 0 + + # If this assert fails then write/ftruncate failed to set + # things up right somewhere + assert save_size == self.size, "Calculated size of " \ + + self.orig_path + " = " + str( self.size ) \ + + " doesn't match internal size " + str( save_size ) \ + + "\nProbably a bug in write or ftruncate!" + logging.debug( "Size calculated is: %d (0x%x)" % ( self.size, self.size ) ) + + Serializer.dumpfh( self.file, { + 'size': self.size, + 'chunks': self.chunks, + 'chunk_size': self.chunk_size + } ) + + # Not dirty anymore + if self.orig_path in dirty_cache: + del dirty_cache[ self.orig_path ] + + + logging.debug( "_fflush exit" ) + return 1 + + + # Currently we treat fsync as flush since we don't keep any data + # hanging around anyway in fh stuff + def fsync(self, isfsyncfile): + logging.debug( "fsync " + self.orig_path ) + self._fflush() + #if isfsyncfile and hasattr(os, 'fdatasync'): + # os.fdatasync(self.fd) + #else: + # os.fsync(self.fd) + + def flush(self): + logging.debug( "flush " + self.orig_path ) + self._fflush() + + def fgetattr(self): + logging.debug( "Overridding fgetattr" ) + stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) ) + + # Fixed in write? + #if self.modified: + # We would need to fsync here to recalc size, but don't do + # it unless modified? otherwise simple getattr will be + # rewriting a ton of files + # print "WARNING: self.modified causes fgetattr to be incorrect!" + + stats.overstat( self.size ) + return stats + + def ftruncate(self, length): + if not self.wr: + return errno.IOError + + curr_chunks = len( self.chunks ) + need_chunks = ( length / self.chunk_size ) + extra_bytes = length % self.chunk_size + logging.debug( "Ftruncate - %d (0x%x)" % ( length, length ) ) + logging.debug( " - self.size: %d" % self.size ) + logging.debug( " - curr_chunks: %d" % curr_chunks ) + logging.debug( " - need_chunks: %d" % need_chunks ) + logging.debug( " - extra_bytes: %d" % extra_bytes ) + + if extra_bytes: + logging.debug( "Need an extra chunk" ) + need_chunks += 1 + + self._load_chunk( 0 ) + + if length == 0: + logging.debug( "Creating 0 chunk file" ) + self.chunks = [] + self.chunk = '' + elif self.size <= length: + logging.debug( "Need to pad out file, writing/seeking to %d" % length ) + + # Just write out null bytes to the length requested, write will do this for us if we specify the offset + self.write( '', length ) + else: + logging.debug( "Truncating chunks" ) + while True: + logging.debug( "Need chunks: %d curr: %d" % ( need_chunks, curr_chunks ) ) + if need_chunks == curr_chunks: + break + + logging.debug( "Deleting chunk %d" % self.chunk_index ) + self.chunks.pop() + curr_chunks = len( self.chunks ) + + # Now make sure this chunk is the right size, first load the + # last chunk + if len( self.chunks ): + self._load_chunk( len( self.chunks ) - 1 ) + logging.debug( "Loaded final chunk, len: %d" % len( self.chunk ) ) + + # Now truncate this item if needed + if len( self.chunk ) > extra_bytes: + logging.debug( "Truncating final chunk to %d" % extra_bytes ) + self.chunk = self.chunk[ :extra_bytes ] + logging.debug( "Chunk is now: %d bytes" % len( self.chunk ) ) + + self.chunk_modified = True + self.modified = True + self.size = length + self._load_chunk( 0 ) + + logging.debug( "ftruncate complete" ) + self._fflush() + + def lock(self, cmd, owner, **kw): + logging.debug( "WARNING: locking unsupported" ) + return 1 + + # The code here is much rather just a demonstration of the locking + # API than something which actually was seen to be useful. + + # Advisory file locking is pretty messy in Unix, and the Python + # interface to this doesn't make it better. + # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent + # way. The following implementation *might* work under Linux. + # + # if cmd == fcntl.F_GETLK: + # import struct + # + # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET, + # kw['l_start'], kw['l_len'], kw['l_pid']) + # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata) + # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid') + # uld2 = struct.unpack('hhQQi', ld2) + # res = {} + # for i in xrange(len(uld2)): + # res[flockfields[i]] = uld2[i] + # + # return fuse.Flock(**res) + + # Convert fcntl-ish lock parameters to Python's weird + # lockf(3)/flock(2) medley locking API... + op = { fcntl.F_UNLCK : fcntl.LOCK_UN, + fcntl.F_RDLCK : fcntl.LOCK_SH, + fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']] + if cmd == fcntl.F_GETLK: + return -errno.EOPNOTSUPP + elif cmd == fcntl.F_SETLK: + if op != fcntl.LOCK_UN: + op |= fcntl.LOCK_NB + elif cmd == fcntl.F_SETLKW: + pass + else: + return -errno.EINVAL + + fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len']) + + @staticmethod + def get_dirty_file( path ): + if path in dirty_cache: + return( dirty_cache[ path ] ) + else: + return None diff --git a/FuseArchive/FileSystem.py b/FuseArchive/FileSystem.py new file mode 100644 index 0000000..dbdc8a3 --- /dev/null +++ b/FuseArchive/FileSystem.py @@ -0,0 +1,154 @@ +import fuse, os, logging, errno +from ChunkFile import ChunkFile + +if not hasattr(fuse, '__version__'): + raise RuntimeError, \ + "your fuse-py doesn't know of fuse.__version__, probably it's too old." + +fuse.fuse_python_api = (0, 2) +fuse.feature_assert('stateful_files', 'has_init') + +class FileSystem(fuse.Fuse): + + def __init__(self, *args, **kw): + + fuse.Fuse.__init__(self, *args, **kw) + self.root = None + + # Fix getattr and fgetattr to? + def getattr(self, path): + treefile = "./tree" + path + + if os.path.isfile( treefile ): + logging.debug( "Delegating getattr to File for " + path ) + + # Check in the dirty cache first (to handle lseek and the + # relatively broken implmentation in fuse/python) + f = ChunkFile.get_dirty_file( path ) + if f: + logging.info( "WORKAROUND: lseek appears to do a gettattr if whence is SEEK_END, using dirty cache object" ) + stats = f.fgetattr() + # no release, it's still being used + else: + f = ChunkFile( path, os.O_RDONLY, 0 ) + stats = f.fgetattr() + f.release( 0 ) + else: + logging.debug( "Using os.lstat to get stats" ) + stats = os.lstat( treefile ) + + return stats + + def readlink(self, path): + return os.readlink("./tree" + path) + + def readdir(self, path, offset): + for e in os.listdir("./tree" + path): + yield fuse.Direntry(e) + + def unlink(self, path): + os.unlink("./tree" + path) + + def rmdir(self, path): + os.rmdir("./tree" + path) + + def symlink(self, path, path1): + os.symlink(path, "./tree" + path1) + + def rename(self, path, path1): + os.rename("./tree" + path, "./tree" + path1) + + def link(self, path, path1): + os.link("./tree" + path, "./tree" + path1) + + def chmod(self, path, mode): + os.chmod("./tree" + path, mode) + + def chown(self, path, user, group): + os.chown("./tree" + path, user, group) + + def truncate(self, path, len): + # Truncate using the ftruncate on the file + logging.debug( "Using FuseArchiveFile to truncate %s to %d" % ( path, len) ) + f = ChunkFile( path, os.O_RDWR, 0 ) + f.ftruncate(len) + f.release( 0 ) + + def mknod(self, path, mode, dev): + os.mknod("./tree" + path, mode, dev) + + def mkdir(self, path, mode): + os.mkdir("./tree" + path, mode) + + def utime(self, path, times): + os.utime("./tree" + path, times) + +# The following utimens method would do the same as the above utime method. +# We can't make it better though as the Python stdlib doesn't know of +# subsecond preciseness in acces/modify times. +# +# def utimens(self, path, ts_acc, ts_mod): +# os.utime("." + path, (ts_acc.tv_sec, ts_mod.tv_sec)) + + def access(self, path, mode): + if not os.access("./tree" + path, mode): + return -errno.EACCES + +# This is how we could add stub extended attribute handlers... +# (We can't have ones which aptly delegate requests to the underlying fs +# because Python lacks a standard xattr interface.) +# +# def getxattr(self, path, name, size): +# val = name.swapcase() + '@' + path +# if size == 0: +# # We are asked for size of the value. +# return len(val) +# return val +# +# def listxattr(self, path, size): +# # We use the "user" namespace to please XFS utils +# aa = ["user." + a for a in ("foo", "bar")] +# if size == 0: +# # We are asked for size of the attr list, ie. joint size of attrs +# # plus null separators. +# return len("".join(aa)) + len(aa) +# return aa + + def statfs(self): + """ + Should return an object with statvfs attributes (f_bsize, f_frsize...). + Eg., the return value of os.statvfs() is such a thing (since py 2.2). + If you are not reusing an existing statvfs object, start with + fuse.StatVFS(), and define the attributes. + + To provide usable information (ie., you want sensible df(1) + output, you are suggested to specify the following attributes: + + - f_bsize - preferred size of file blocks, in bytes + - f_frsize - fundamental size of file blcoks, in bytes + [if you have no idea, use the same as blocksize] + - f_blocks - total number of blocks in the filesystem + - f_bfree - number of free blocks + - f_files - total number of file inodes + - f_ffree - nunber of free file inodes + """ + + return os.statvfs(".") + + def fsinit(self): + os.chdir(self.root) + + def main(self, *a, **kw): + + self.file_class = ChunkFile + + # This is where fragments go + if not os.path.exists( 'storage' ): + os.mkdir( 'storage' ) + + # This is where the real files exist + if not os.path.exists( 'tree' ): + os.mkdir( 'tree' ) + + return fuse.Fuse.main(self, *a, **kw) + diff --git a/FuseArchive/Serializer.py b/FuseArchive/Serializer.py new file mode 100644 index 0000000..b2e1592 --- /dev/null +++ b/FuseArchive/Serializer.py @@ -0,0 +1,40 @@ +import gzip, FuseArchive + +class Serializer: + """This lets us experiment with different main file serializers""" + @staticmethod + def dump( f, obj ): + out = FuseArchiveStream.open( f, "wb" ) + Serializer.dumpfh( obj, out ) # new file format + out.close() + + @staticmethod + def dumpfh( fh, obj ): + logging.debug( "Going to serialize %s to %s" % ( obj, fh ) ) + fh.truncate( 0 ) + fh.seek( 0 ) + f = gzip.GzipFile( None, "wb", gzip_compress_level, fh ) + #f = fh + cPickle.dump( obj, f, -1 ) + del f + fh.flush() + + @staticmethod + def load( f ): + if FuseArchive.magic_profiling: + return { 'size': 0, 'chunks': 0, 'chunk_size': 0 } + + inp = open( f, "rb" ) + magic = Serializer.loadfh( inp ) + inp.close() + return magic + + @staticmethod + def loadfh( fh ): + logging.debug( "Going to load from %s" % fh ) + fh.seek( 0 ) + f = gzip.GzipFile( None, "rb", gzip_compress_level, fh ) + #f = fh + magic = cPickle.load( f ) + return( magic ) + diff --git a/FuseArchive/__init__.py b/FuseArchive/__init__.py new file mode 100644 index 0000000..5883d80 --- /dev/null +++ b/FuseArchive/__init__.py @@ -0,0 +1,5 @@ +from FileSystem import FileSystem + +magic_profiling = False +deep_debug = False + diff --git a/fusearchive.py b/fusearchive.py index bd0d843..1d5281b 100755 --- a/fusearchive.py +++ b/fusearchive.py @@ -8,21 +8,8 @@ # See the file COPYING. # -import os, sys, fcntl, fuse, sha, cPickle, gzip, errno -import zipfile, logging -from fuse import Fuse -from binascii import hexlify - -#import pdb - -if not hasattr(fuse, '__version__'): - raise RuntimeError, \ - "your fuse-py doesn't know of fuse.__version__, probably it's too old." - -fuse.fuse_python_api = (0, 2) - -fuse.feature_assert('stateful_files', 'has_init') - +import logging, sys, os, fuse +import FuseArchive #log_level = logging.DEBUG log_level = logging.WARNING @@ -32,983 +19,48 @@ logging.basicConfig( level = log_level, stream = sys.stderr, filemode = 'w' ) -magic_profiling = False enable_stats = False enable_psyco = False -deep_debug = False - -# These control some of the file output -magic_blocksize = 1024 * 128 -# Use a tiny block size to debug writes, so you can use a smaller test file -#magic_blocksize = 1024 -magic_depth = 5 -gzip_compress_level = 6 -chunkstyle = 'fs' - -# Memory for dirty blocks, per file (1M) -dirty_size = 1024 * 1024 * 1; -# This is the number of actualy blocks in that size -dirty_flush = int( dirty_size / magic_blocksize ) - -# This is a cache of open files by inode, to fix the lseek == size problem -# this causes a failure in fsx-linux becuase to to lseek(fd,0,seek_end) it -# apparently does a getattr to find the file length then subtracts the -# offset from that to pass to write or whatever, since the offset is passed -# to write and we don't maintain one internally. xmp.py also fails this -# test. -dirty_cache = {} - -def flag2mode(flags): - md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'} - m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)] - - if flags & os.O_APPEND: - m = m.replace('w', 'a', 1) - - return m - -def save_chunk( chunk ): - if chunkstyle == 'fs': - return _save_chunk_fs( chunk ) - elif chunkstyle == 'zip': - return _save_chunk_zip( chunk ) - else: - raise ValueError( 'Unknown chunk style' ) - -def load_chunk( key ): - if chunkstyle == 'fs': - return _load_chunk_fs( key ) - elif chunkstyle == 'zip': - return _load_chunk_zip( key ) - else: - raise ValueError( 'Unknown chunk style' ) - -# This will write out a data block, it will return a key that can get this -# data back later -def _save_chunk_fs( chunk ): - if magic_profiling: - return( [ 0, 0 ] ) - - logging.debug( "Begin save_chunk, length: %d" % len( chunk ) ) - if deep_debug: - logging.debug( "Chunk: %s" + hexlify( chunk ) ) - - # Save this hash string, similar to the backuppc algo - digest = sha.new( chunk ).digest() - - # Write out our chunk - chars = list( digest ) - logging.debug( chars ) - - # We make the hexdigest here, yeah we could just call hexdigest() - # but we need to essentially do this same thing to reassemble the - # file anyway - hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) - - # Subparts just needs the first N chars - subparts = [ "%02x" % ord( x ) for x in chars[ :magic_depth ] ] - - logging.debug( subparts ) - subpath = '/'.join( subparts ) - logging.debug( "Subpath: " + subpath ) - - # Make sure this sub path exists - nextpart = "./storage" - for part in subparts: - nextpart += "/" + part - if not os.path.exists( nextpart ): - logging.debug( "Creating subdir: " + nextpart ) - os.mkdir( nextpart ) - - # Find a chunk slot - sub = 0 - while True: - checkpath = "./storage/%s/%s_%d" % ( subpath, hexdigest, sub ) - logging.debug( "Checking: " + checkpath ) - if os.path.exists( checkpath ): - # Check if this is our data - verify = FuseArchiveStream.open( checkpath, "rb" ) - verify_contents = verify.read() - verify.close() - - if verify_contents == chunk: - logging.debug( "Found existing block" ) - break - else: - logging.debug( "Block exists but is not the same" ) - sub += 1 - else: - # We found a spot, dump our data here - logging.debug( "No block here, creating new block" ) - savechunk = FuseArchiveStream.open( checkpath, "wb" ) - savechunk.write( chunk ) - savechunk.close() - break - - logging.debug( "Got chunk slot: %d" % sub ) - return( [ digest, sub ] ) - -def _save_chunk_zip( chunk ): - if magic_profiling: - return( [ 0, 0 ] ) - - logging.debug( "Begin save_chunk, length: %d" % len( chunk ) ) - if deep_debug: - logging.debug( "Chunk: %s" + hexlify( chunk ) ) - - # Save this hash string, similar to the backuppc algo - digest = sha.new( chunk ).digest() - - # Write out our chunk - chars = list( digest ) - logging.debug( chars ) - - # We make the hexdigest here, yeah we could just call hexdigest() - # but we need to essentially do this same thing to reassemble the - # file anyway - hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) - - # Should be about max of 32k zip files - zipname = hexdigest[ 0:4 ] + ".zip" - logging.debug( "Zip name: " + zipname ) - if not os.path.exists( "./storage/" + zipname ): - logging.debug( "Creating intial empty zip" ) - z = zipfile.ZipFile( "./storage/" + zipname, 'w', zipfile.ZIP_DEFLATED, True ) - # append mode throws an exception if it's not zip, or maybe it's - # just zero-length files - z.writestr( 'junk', 'junk' ) - z.close() - - z = zipfile.ZipFile( "./storage/" + zipname, 'a', zipfile.ZIP_DEFLATED, True ) - - # Find a chunk slot - sub = 0 - while True: - checkpath = "%s_%d" % ( hexdigest, sub ) - logging.debug( "Checking: " + checkpath ) - try: - data = z.read( checkpath ) - except: - data = '' - - if len(data): - if data == chunk: - logging.debug( "Found existing block" ) - break - else: - logging.debug( "Block exists but is not the same" ) - sub += 1 - else: - # We found a spot, dump our data here - logging.debug( "No block here, creating new block" ) - z.writestr( checkpath, chunk ) - break - - z.close() - logging.debug( "Got chunk slot: %d" % sub ) - return( [ digest, sub ] ) - -# This will return a data block by key that was saved previously -def _load_chunk_fs( key ): - if magic_profiling: - return '' - - ( thash, seq ) = key - logging.debug( "Begin load_chunk" ) - - chars = list( thash ) - logging.debug( chars ) - - # Todo: make a digest -> path function to share with deflate - hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) - logging.debug( "Hash is: %s sub %d" % ( hexdigest, seq ) ) - subparts = [ "%02x" % ord( x ) for x in chars[ :magic_depth ] ] - subpath = '/'.join( subparts ) - logging.debug( "Subpath: " + subpath ) - - subpath += "/%s_%d" % ( hexdigest, seq ) - - logging.debug( "Chunk path: " + subpath ) - - if os.path.exists( "./storage/" + subpath ): - logging.debug( "Exporting chunk" ) - readchunk = FuseArchiveStream.open( "./storage/" + subpath ) - chunk = readchunk.read() - readchunk.close() - else: - raise IOError - - if deep_debug: - logging.debug( "Load-Chunk: %s" + hexlify( chunk ) ) - - return chunk - -def _load_chunk_zip( key ): - if magic_profiling: - return '' - - ( thash, seq ) = key - logging.debug( "Begin load_chunk" ) - - chars = list( thash ) - logging.debug( chars ) - - # Todo: make a digest -> path function to share with deflate - hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) - - zipname = hexdigest[ 0:4 ] + ".zip" - logging.debug( "Zip name: " + zipname ) - z = zipfile.ZipFile( "./storage/" + zipname, 'r', zipfile.ZIP_DEFLATED, True ) - - subpath = "%s_%d" % ( hexdigest, seq ) - logging.debug( "Chunk path: " + subpath ) - data = z.read( subpath ) - if len( data ): - logging.debug( "Exporting chunk" ) - chunk = data - else: - z.close() - raise IOError - - if deep_debug: - logging.debug( "Load-Chunk: %s" + hexlify( chunk ) ) - - z.close() - return chunk - - -class FuseArchiveStream: - """This just allows switching out writer classes easily""" - @staticmethod - def open( path, mode = 'r' ): - fh = gzip.open( path, mode, gzip_compress_level ) - #fh = open( path, mode ) - return fh - -class FuseArchiveSerializer: - """This lets us experiment with different main file serializers""" - @staticmethod - def dump( f, obj ): - out = FuseArchiveStream.open( f, "wb" ) - FuseArchiveSerializer.dumpfh( obj, out ) # new file format - out.close() - - @staticmethod - def dumpfh( fh, obj ): - logging.debug( "Going to serialize %s to %s" % ( obj, fh ) ) - fh.truncate( 0 ) - fh.seek( 0 ) - f = gzip.GzipFile( None, "wb", gzip_compress_level, fh ) - #f = fh - cPickle.dump( obj, f, -1 ) - del f - fh.flush() - - @staticmethod - def load( f ): - if magic_profiling: - return { 'size': 0, 'chunks': 0, 'chunk_size': 0 } - - inp = FuseArchiveStream.open( f, "rb" ) - magic = FuseArchiveSerializer.loadfh( inp ) - inp.close() - return magic - - @staticmethod - def loadfh( fh ): - logging.debug( "Going to load from %s" % fh ) - fh.seek( 0 ) - f = gzip.GzipFile( None, "rb", gzip_compress_level, fh ) - #f = fh - magic = cPickle.load( f ) - return( magic ) - -class FuseArchiveStat(fuse.Stat): - def __init__(self, stat): - self.st_mode = stat.st_mode - self.st_ino = stat.st_ino - self.st_dev = stat.st_dev - self.st_rdev = stat.st_rdev - self.st_nlink = stat.st_nlink - self.st_uid = stat.st_uid - self.st_gid = stat.st_gid - self.st_size = stat.st_size - self.st_atime = stat.st_atime - self.st_mtime = stat.st_mtime - self.st_ctime = stat.st_mtime - self.st_blocks = stat.st_blocks - self.st_blksize = stat.st_blksize - - def overstat( self, size ): - self.st_size = size - # Yeah we shouldn't always just add 1 - self.st_blocks = int( self.st_size / 512 ) + 1 - -class FuseArchive(Fuse): - - def __init__(self, *args, **kw): - - Fuse.__init__(self, *args, **kw) - self.root = None - - # Fix getattr and fgetattr to? - def getattr(self, path): - treefile = "./tree" + path - - if os.path.isfile( treefile ): - logging.debug( "Delegating getattr to FuserArchiveFile for " + path ) - - # Check in the dirty cache first (to handle lseek and the - # relatively broken implmentation in fuse/python) - if path in dirty_cache: - logging.info( "WORKAROUND: lseek appears to do a gettattr if whence is SEEK_END, using dirty cache object" ) - f = dirty_cache[ path ] - stats = f.fgetattr() - # no release, it's still being used - else: - f = self.FuseArchiveFile( path, os.O_RDONLY, 0 ) - stats = f.fgetattr() - f.release( 0 ) - else: - logging.debug( "Using os.lstat to get stats" ) - stats = os.lstat( treefile ) - - return stats - - def readlink(self, path): - return os.readlink("./tree" + path) - - def readdir(self, path, offset): - for e in os.listdir("./tree" + path): - yield fuse.Direntry(e) - - def unlink(self, path): - os.unlink("./tree" + path) - - def rmdir(self, path): - os.rmdir("./tree" + path) - - def symlink(self, path, path1): - os.symlink(path, "./tree" + path1) - - def rename(self, path, path1): - os.rename("./tree" + path, "./tree" + path1) - - def link(self, path, path1): - os.link("./tree" + path, "./tree" + path1) - - def chmod(self, path, mode): - os.chmod("./tree" + path, mode) - - def chown(self, path, user, group): - os.chown("./tree" + path, user, group) - - def truncate(self, path, len): - # Truncate using the ftruncate on the file - logging.debug( "Using FuseArchiveFile to truncate %s to %d" % ( path, len) ) - f = self.FuseArchiveFile( path, os.O_RDWR, 0 ) - f.ftruncate(len) - f.release( 0 ) - - def mknod(self, path, mode, dev): - os.mknod("./tree" + path, mode, dev) - - def mkdir(self, path, mode): - os.mkdir("./tree" + path, mode) - - def utime(self, path, times): - os.utime("./tree" + path, times) - -# The following utimens method would do the same as the above utime method. -# We can't make it better though as the Python stdlib doesn't know of -# subsecond preciseness in acces/modify times. -# -# def utimens(self, path, ts_acc, ts_mod): -# os.utime("." + path, (ts_acc.tv_sec, ts_mod.tv_sec)) - - def access(self, path, mode): - if not os.access("./tree" + path, mode): - return -errno.EACCES - -# This is how we could add stub extended attribute handlers... -# (We can't have ones which aptly delegate requests to the underlying fs -# because Python lacks a standard xattr interface.) -# -# def getxattr(self, path, name, size): -# val = name.swapcase() + '@' + path -# if size == 0: -# # We are asked for size of the value. -# return len(val) -# return val -# -# def listxattr(self, path, size): -# # We use the "user" namespace to please XFS utils -# aa = ["user." + a for a in ("foo", "bar")] -# if size == 0: -# # We are asked for size of the attr list, ie. joint size of attrs -# # plus null separators. -# return len("".join(aa)) + len(aa) -# return aa - - def statfs(self): - """ - Should return an object with statvfs attributes (f_bsize, f_frsize...). - Eg., the return value of os.statvfs() is such a thing (since py 2.2). - If you are not reusing an existing statvfs object, start with - fuse.StatVFS(), and define the attributes. - - To provide usable information (ie., you want sensible df(1) - output, you are suggested to specify the following attributes: - - - f_bsize - preferred size of file blocks, in bytes - - f_frsize - fundamental size of file blcoks, in bytes - [if you have no idea, use the same as blocksize] - - f_blocks - total number of blocks in the filesystem - - f_bfree - number of free blocks - - f_files - total number of file inodes - - f_ffree - nunber of free file inodes - """ - - return os.statvfs(".") - - def fsinit(self): - os.chdir(self.root) - - class FuseArchiveFile(object): - - def __init__(self, path, flags, *mode): - # Inflate the file - logging.debug( "Init file: " + path ) - self.orig_path = path - - # init rw and offset - self.rd = False - self.wr = False - self.size = 0 - self.modified = False - - # This is the current in-memory chunk and offset in to data[] - self.chunk_cache = {}; - self.chunk = '' - self.chunk_index = -1 - self.chunk_modified = False - self.chunk_size = magic_blocksize - self.dirty_chunks = 0 - - # The chunk table - self.chunks = [] - # TODO: Better flag handling here? - if flags & os.O_RDONLY: - self.rd = True - - if flags & os.O_RDWR: - self.rd = True - self.wr = True - - if flags & os.O_WRONLY: - self.wr = True - - if flags & os.O_APPEND: - self.wr = True - - if os.path.exists( "./tree" + self.orig_path ): - preexist = True - else: - preexist = False - - # Open the file now and keep the fh around so that cp -a on r/o - # files works (in the create a read-only file for writing case) - src = "./tree" + path - logging.debug( "Saving fh for " + src ) - nflags = os.O_RDWR | os.O_APPEND - if flags & os.O_CREAT: - logging.debug( "Adding O_CREAT" ) - nflags = nflags | os.O_CREAT - - self.file = os.fdopen( os.open( src, nflags, *mode ), - flag2mode( nflags ) ) - - if preexist: - # Read in file info table - logging.debug( "Unpickling: %s" % self.file ) - # TODO: return an IO error if inflating fails - try: - magic = FuseArchiveSerializer.loadfh( self.file ) - logging.debug( "Got data: %s" % magic ) - self.size = magic[ 'size' ] - self.chunks = magic[ 'chunks' ] - self.chunk_size = magic[ 'chunk_size' ] - except Exception, e: - logging.critical( self.orig_path + ": " + str( e ) ) - else: - if self.wr: - logging.debug( "File doesn't exist and we're going to write, creating temp empty file" ) - self.modified = True - self.flush() - - self.direct_io = False - self.keep_cache = False - - logging.debug( "%s init complete" % self ) - - def _load_chunk( self, index ): - # If the current chunk is the same as the chunk we're loading - # just return - logging.debug( "_load_chunk: %d" % index ) - - if index == self.chunk_index: - logging.debug( "Load chunk is same as current chunk, all done" ) - return - - # Save this chunk if modified - self._save_chunk() - - logging.debug( "Loading chunk %d" % index ) - key = None - - size = len( self.chunks ) - if index >= size: - logging.debug( "Index doesn't exist" ) - else: - key = self.chunks[ index ] - - if key: - if isinstance( key, str ): - logging.debug( "Found cached dirty page" ) - self.chunk = key - else: - logging.debug( "Index: %s" % key ) - self.chunk = load_chunk( key ) - else: - logging.debug( "No chunk at this index, loading nothing" ) - self.chunk = '' - - logging.debug( "Loaded chunk of length: %d" % len( self.chunk ) ) - - self.chunk_index = index - self.chunk_modified = False - - # This simply puts the chunk data inside our current chunks at chunk_index - def _save_chunk(self): - if self.chunk_modified: - logging.debug( "Saving chunk %d" % self.chunk_index ) - - # Make sure we have room for this chunk - size = len( self.chunks ) - if self.chunk_index >= size: - self.chunks.extend( [ '' ] * ( self.chunk_index -size + 1 ) ) - - # Increment dirty chunks if we had a key here already - if isinstance( self.chunks[ self.chunk_index ], list ) or \ - len( self.chunks[ self.chunk_index ] ) == 0: - self.dirty_chunks += 1 - logging.debug( "Dirty chunks is now: %d" % self.dirty_chunks ) - logging.debug( "Dirty flush at: %d" % dirty_flush ) - - # Save the dirty chunk temporarily in memory - self.chunks[ self.chunk_index ] = self.chunk - - # Flush if we have too many dirty chunks - if self.dirty_chunks > dirty_flush: - self._flush_chunks() - - # This flushes any cached chunks - def _flush_chunks(self): - for index in range( len( self.chunks ) ): - if isinstance( self.chunks[ index ], str ): - logging.debug( "Flushing chunk at %d" % index ) - key = save_chunk( self.chunks[ index ] ) - self.chunks[ index ] = key - logging.debug( "Key was %s" % key ) - self.dirty_chunks = 0 - - def read(self, length, offset): - logging.debug( "Reading from %s offset: %d (0x%x) length: %d (0x%d)" % - ( self.orig_path, offset, offset, length, length ) ) - - data_read = 0 - data = '' - index = int( offset / self.chunk_size ) - rest = offset % self.chunk_size - is_eof = False - - # Keep reading chunks until we have at least this much data - while data_read < length and not is_eof: - logging.debug( "Pulling chunk data: %d" % index ) - self._load_chunk( index ) - if len(self.chunk): - chunk_remaining = len(self.chunk) - rest - to_read = chunk_remaining - data_left = length - data_read - if data_left < chunk_remaining: - to_read = data_left - - logging.debug( "chunk_remaining: %d" % chunk_remaining ) - logging.debug( "data_left: %d" % data_left ) - logging.debug( "data_read: %d" % data_read ) - logging.debug( "rest: %d" % rest ) - logging.debug( "Copying %d bytes" % to_read ) - - data += self.chunk[ rest:(rest+to_read) ] - data_read += to_read - index += 1 - rest = 0 - else: - logging.debug( "No more chunk data, bye" ) - is_eof = True - - logging.debug( "Returning %d bytes of data" % len( data ) ) - logging.debug( "Internal count was: %d" % data_read ) - return data - - def write(self, buf, offset): - if magic_profiling: - return len( buf ) - - logging.debug( "Writing to %s offset: %d (0x%x) length: %d (0x%x)" % - ( self.orig_path, offset, offset, len( buf ), len( buf ) ) ) - - index = int( offset / self.chunk_size ) - rest = offset % self.chunk_size - - logging.debug( "This chunk falls on index: %d rest: %d" % ( index, rest ) ) - logging.debug( "We have %d chunks" % len( self.chunks ) ) - logging.debug( "File size is: %d" % self.size ) - - # If index is higher than the number of blocks we current have it's a seek hole, so we need to extend our blocks out - # We know these need to essentially be zeroed up to this size since - if len( self.chunks ) - 1 < index: - logging.debug( "Not enough chunks %d, need %d, extending" % - ( len( self.chunks ), index + 1 ) ) - this_index = 0 - while this_index < index: - self._load_chunk( this_index ) - fill_null = self.chunk_size - len(self.chunk) - logging.debug( "Filling this chunk with null, bytes: %d" % fill_null ) - self.chunk += "\0" * fill_null - logging.debug( "Chunk is now: %d bytes" % len( self.chunk) ) - self.chunk_modified = True - self._save_chunk() - this_index += 1 - - self._load_chunk( index ) - - # Now check if this chunk needs to be extended - if len( self.chunk ) < rest: - fill_null = rest - len(self.chunk) - logging.debug( "Filling final chunk with null, bytes: %d" % fill_null ) - self.chunk += "\0" * fill_null - self.chunk_modified = True - self._save_chunk() - - buf_offset = 0 - buf_len = len(buf) - - logging.debug( "Length: %d" % buf_len ) - while( buf_offset < buf_len ): - logging.debug( "Pulling in chunk for writing: %d" % index ) - self._load_chunk( index ) - buf_remain = buf_len - buf_offset - chunk_remain = self.chunk_size - rest - - logging.debug( "buf_remain: %d" % buf_remain ) - logging.debug( "chunk_remain: %d" % chunk_remain ) - - if chunk_remain < buf_remain: - logging.debug( "Writing %d bytes, buffer boundry" % chunk_remain ) - this_len = chunk_remain - else: - logging.debug( "Writing final %d bytes" % buf_remain ) - this_len = buf_remain - - logging.debug( "Bytes to copy: %d" % this_len ) - logging.debug( " buf offset: %d" % buf_offset ) - logging.debug( " chunk offset: %d" % rest ) - - if deep_debug: - logging.debug( "Pre-Buf: %s" % hexlify(buf) ) - logging.debug( "Pre-Chunk: %s" % hexlify(self.chunk) ) - - # Since python doesn't do in-place reassignment like you - # can with splice() we will reconstruct the data by joining - # stuff by offsets (first chars to skip, then our joining - # buf chunk, the everything that would have been after it) - self.chunk = self.chunk[ :rest ] + \ - buf[ buf_offset:(buf_offset+this_len) ] + \ - self.chunk[ (rest + this_len): ] - - if deep_debug: - logging.debug( "Post-Buf: %s" % hexlify(buf) ) - logging.debug( "Post-Chunk: %s" % hexlify(self.chunk) ) - - buf_offset += this_len - - # Advance to next block - rest = 0 - index += 1 - self.chunk_modified = True - - self._save_chunk() - self.modified = True - if offset + len(buf) > self.size: - self.size = offset + len(buf) - - logging.debug( "This chunk size is now: %d" % len( self.chunk ) ) - logging.debug( "File size is now: %d" % self.size ) - logging.debug( "Num Chunks: %d" % len( self.chunks ) ) - - # Mark us in the dirty cache - dirty_cache[ self.orig_path ] = self - - return len(buf) - - # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show - # up right? like wrong size and stuff? - # Maybe because release doesn't return a fuse message and is async? - def release(self, flags): - # Deflate the file - logging.debug( "Release: " + self.orig_path ) - self.flush() - self.file.close() - - def _fflush(self): - if self.wr and self.modified: - logging.debug( "_fflush!" ) - # Save our main data - self._save_chunk() - - # And flush any cached chunks - self._flush_chunks() - - save_size = self.size - - # Figure out our size based on the number of chunks + the - # len of the final chunk - numchunks = len( self.chunks ) - if numchunks > 0: - # Load the last chunk - logging.debug( "We have %d chunks, calculating size" % numchunks ) - self._load_chunk( numchunks - 1 ) - self.size = ( numchunks - 1 ) * self.chunk_size + \ - len( self.chunk ) - else: - logging.debug( "No chunks, setting size to zero" ) - self.size = 0 - - # If this assert fails then write/ftruncate failed to set - # things up right somewhere - assert save_size == self.size, "Calculated size of " \ - + self.orig_path + " = " + str( self.size ) \ - + " doesn't match internal size " + str( save_size ) \ - + "\nProbably a bug in write or ftruncate!" - logging.debug( "Size calculated is: %d (0x%x)" % ( self.size, self.size ) ) - - FuseArchiveSerializer.dumpfh( self.file, { - 'size': self.size, - 'chunks': self.chunks, - 'chunk_size': self.chunk_size - } ) - - # Not dirty anymore - if self.orig_path in dirty_cache: - del dirty_cache[ self.orig_path ] - - - logging.debug( "_fflush exit" ) - return 1 - - - # Currently we treat fsync as flush since we don't keep any data - # hanging around anyway in fh stuff - def fsync(self, isfsyncfile): - logging.debug( "fsync " + self.orig_path ) - self._fflush() - #if isfsyncfile and hasattr(os, 'fdatasync'): - # os.fdatasync(self.fd) - #else: - # os.fsync(self.fd) - - def flush(self): - logging.debug( "flush " + self.orig_path ) - self._fflush() - - def fgetattr(self): - logging.debug( "Overridding fgetattr" ) - stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) ) - - # Fixed in write? - #if self.modified: - # We would need to fsync here to recalc size, but don't do - # it unless modified? otherwise simple getattr will be - # rewriting a ton of files - # print "WARNING: self.modified causes fgetattr to be incorrect!" - - stats.overstat( self.size ) - return stats - - def ftruncate(self, length): - if not self.wr: - return errno.IOError - - curr_chunks = len( self.chunks ) - need_chunks = ( length / self.chunk_size ) - extra_bytes = length % self.chunk_size - logging.debug( "Ftruncate - %d (0x%x)" % ( length, length ) ) - logging.debug( " - self.size: %d" % self.size ) - logging.debug( " - curr_chunks: %d" % curr_chunks ) - logging.debug( " - need_chunks: %d" % need_chunks ) - logging.debug( " - extra_bytes: %d" % extra_bytes ) - - if extra_bytes: - logging.debug( "Need an extra chunk" ) - need_chunks += 1 - - self._load_chunk( 0 ) - - if length == 0: - logging.debug( "Creating 0 chunk file" ) - self.chunks = [] - self.chunk = '' - elif self.size <= length: - logging.debug( "Need to pad out file, writing/seeking to %d" % length ) - - # Just write out null bytes to the length requested, write will do this for us if we specify the offset - self.write( '', length ) - else: - logging.debug( "Truncating chunks" ) - while True: - logging.debug( "Need chunks: %d curr: %d" % ( need_chunks, curr_chunks ) ) - if need_chunks == curr_chunks: - break - - logging.debug( "Deleting chunk %d" % self.chunk_index ) - self.chunks.pop() - curr_chunks = len( self.chunks ) - - # Now make sure this chunk is the right size, first load the - # last chunk - if len( self.chunks ): - self._load_chunk( len( self.chunks ) - 1 ) - logging.debug( "Loaded final chunk, len: %d" % len( self.chunk ) ) - - # Now truncate this item if needed - if len( self.chunk ) > extra_bytes: - logging.debug( "Truncating final chunk to %d" % extra_bytes ) - self.chunk = self.chunk[ :extra_bytes ] - logging.debug( "Chunk is now: %d bytes" % len( self.chunk ) ) - - self.chunk_modified = True - self.modified = True - self.size = length - self._load_chunk( 0 ) - - logging.debug( "ftruncate complete" ) - self._fflush() - - def lock(self, cmd, owner, **kw): - logging.debug( "WARNING: locking unsupported" ) - return 1 - - # The code here is much rather just a demonstration of the locking - # API than something which actually was seen to be useful. - - # Advisory file locking is pretty messy in Unix, and the Python - # interface to this doesn't make it better. - # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent - # way. The following implementation *might* work under Linux. - # - # if cmd == fcntl.F_GETLK: - # import struct - # - # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET, - # kw['l_start'], kw['l_len'], kw['l_pid']) - # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata) - # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid') - # uld2 = struct.unpack('hhQQi', ld2) - # res = {} - # for i in xrange(len(uld2)): - # res[flockfields[i]] = uld2[i] - # - # return fuse.Flock(**res) - - # Convert fcntl-ish lock parameters to Python's weird - # lockf(3)/flock(2) medley locking API... - op = { fcntl.F_UNLCK : fcntl.LOCK_UN, - fcntl.F_RDLCK : fcntl.LOCK_SH, - fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']] - if cmd == fcntl.F_GETLK: - return -errno.EOPNOTSUPP - elif cmd == fcntl.F_SETLK: - if op != fcntl.LOCK_UN: - op |= fcntl.LOCK_NB - elif cmd == fcntl.F_SETLKW: - pass - else: - return -errno.EINVAL - - fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len']) - - - def main(self, *a, **kw): - - self.file_class = self.FuseArchiveFile - - # This is where fragments go - if not os.path.exists( 'storage' ): - os.mkdir( 'storage' ) - - # This is where the real files exist - if not os.path.exists( 'tree' ): - os.mkdir( 'tree' ) - - return Fuse.main(self, *a, **kw) - - -def main(): - - usage = """ +if enable_psyco: + # Import Psyco if available + try: + import psyco + psyco.full() + except ImportError: + pass + +if enable_stats: + import hotshot + prof = hotshot.Profile( "fusearchive_stats" ) + prof.runcall() + prof.close() + +usage = """ Userspace nullfs-alike: mirror the filesystem tree from some point on. -""" + Fuse.fusage - - server = FuseArchive(version="%prog " + fuse.__version__, - usage=usage, - dash_s_do='setsingle') +""" + fuse.Fuse.fusage - server.multithreaded = False +server = FuseArchive.FileSystem(version="%prog " + fuse.__version__, + usage=usage, + dash_s_do='setsingle') - server.parse(values=server, errex=1) +server.multithreaded = False - if len(server.parser.largs) != 2: - print "Usage: " + sys.argv[0] + " storageDirectory mountDirectory" - sys.exit(1) +server.parse(values=server, errex=1) - server.root = server.parser.largs[0] - - try: - if server.fuse_args.mount_expected(): - os.chdir(server.root) - except OSError: - print >> sys.stderr, "can't enter root of underlying filesystem" - sys.exit(1) +if len(server.parser.largs) != 2: + print "Usage: " + sys.argv[0] + " storageDirectory mountDirectory" + sys.exit(1) - server.main() +server.root = server.parser.largs[0] +try: + if server.fuse_args.mount_expected(): + os.chdir(server.root) +except OSError: + print >> sys.stderr, "can't enter root of underlying filesystem" + sys.exit(1) -if __name__ == '__main__': - if enable_psyco: - # Import Psyco if available - try: - import psyco - psyco.full() - except ImportError: - pass +server.main() - if enable_stats: - import hotshot - prof = hotshot.Profile( "fusearchive_stats" ) - prof.runcall(main) - prof.close() - else: - main() -- cgit v0.10.2