#!/usr/bin/env python # Copyright (C) 2001 Jeff Epler # Copyright (C) 2006 Csaba Henk # Copyright (C) 2009 Steve Slaven # # This program can be distributed under the terms of the GNU LGPL. # See the file COPYING. # import os, sys, shutil, fcntl, fuse, re import tempfile, sha, pickle, gzip from errno import * from stat import * from fuse import Fuse import pdb if not hasattr(fuse, '__version__'): raise RuntimeError, \ "your fuse-py doesn't know of fuse.__version__, probably it's too old." fuse.fuse_python_api = (0, 2) fuse.feature_assert('stateful_files', 'has_init') magic_blocksize = 1024 * 128 magic_depth = 5 debug_level = 0 gzip_compress_level = 1 # Memory for dirty blocks, per file (1M) dirty_size = 1024 * 1024 * 1; # This is the number of actualy blocks in that size dirty_flush = int( dirty_size / magic_blocksize ) def dmsg(level,message): if level <= debug_level: print str(level) + ": " + str(message) # This will write out a data block, it will return a key that can get this # data back later def save_chunk( chunk ): dmsg( 2, "Begin save_chunk" ) # Save this hash string, similar to the backuppc algo digest = sha.new( chunk ).digest() # Write out our chunk chars = list( digest ) dmsg( 4, chars ) # We make the hexdigest here, yeah we could just call hexdigest() # but we need to essentially do this same thing to reassemble the # file anyway hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) # Subparts just needs the first N chars subparts = [ "%02x" % ord( x ) for x in chars[ :magic_depth ] ] dmsg( 4, subparts ) subpath = '/'.join( subparts ) dmsg( 3, "Subpath: " + subpath ) # Make sure this sub path exists nextpart = "./storage" for part in subparts: nextpart += "/" + part if not os.path.exists( nextpart ): dmsg( 3, "Creating subdir: " + nextpart ) os.mkdir( nextpart ) # Find a chunk slot sub = 0 while True: checkpath = "./storage/" + subpath + "/" + hexdigest + "_" + str( sub ) dmsg( 3, "Checking: " + checkpath ) if os.path.exists( checkpath ): # Check if this is our data verify = gzip.open( checkpath, "rb" ) verify_contents = verify.read() verify.close() if verify_contents == chunk: dmsg( 3, "Found existing block" ) break else: dmsg( 3, "Block exists but is not the same" ) sub += 1 else: # We found a spot, dump our data here dmsg( 3, "No block here, creating new block" ) savechunk = gzip.open( checkpath, "wb", gzip_compress_level ) savechunk.write( chunk ) savechunk.close break dmsg( 3, "Got chunk slot: " + str( sub ) ) return( [ digest, sub ] ) # This will return a data block by key that was saved previously def load_chunk( key ): ( hash, seq ) = key dmsg( 2, "Begin load_chunk" ) chars = list( hash ) dmsg( 4, chars ) # Todo: make a digest -> path function to share with deflate hexdigest = ''.join( [ "%02x" % ord( x ) for x in chars ] ) dmsg( 3, "Hash is: " + str(hexdigest) + " sub " + str(seq) ) subparts = [ "%02x" % ord( x ) for x in chars[ :magic_depth ] ] subpath = '/'.join( subparts ) dmsg( 3, "Subpath: " + subpath ) subpath += "/" + hexdigest + "_" + str( seq ) dmsg( 3, "Chunk path: " + subpath ) if os.path.exists( "./storage/" + subpath ): dmsg( 3, "Exporting chunk" ) readchunk = gzip.open( "./storage/" + subpath ) chunk = readchunk.read() readchunk.close() else: raise IOError return chunk class FuseArchiveStat(fuse.Stat): def __init__(self, stat): self.st_mode = stat.st_mode self.st_ino = stat.st_ino self.st_dev = stat.st_dev self.st_rdev = stat.st_rdev self.st_nlink = stat.st_nlink self.st_uid = stat.st_uid self.st_gid = stat.st_gid self.st_size = stat.st_size self.st_atime = stat.st_atime self.st_mtime = stat.st_mtime self.st_ctime = stat.st_mtime self.st_blocks = stat.st_blocks self.st_blksize = stat.st_blksize def overstat( self, size ): self.st_size = size # Yeah we shouldn't always just add 1 self.st_blocks = int( self.st_size / 512 ) + 1 class FuseArchive(Fuse): def __init__(self, *args, **kw): Fuse.__init__(self, *args, **kw) self.root = None # Fix getattr and fgetattr to? def getattr(self, path): treefile = "./tree" + path if os.path.isfile( treefile ): dmsg( 3, "Delegating getattr to FuserArchiveFile" ) f = self.FuseArchiveFile( path, os.O_RDONLY, 0 ) stats = f.fgetattr() f.release( 0 ) else: dmsg( 3, "Using os.lstat to get stats" ) stats = os.lstat( treefile ) return stats def readlink(self, path): return os.readlink("./tree" + path) def readdir(self, path, offset): for e in os.listdir("./tree" + path): yield fuse.Direntry(e) def unlink(self, path): os.unlink("./tree" + path) def rmdir(self, path): os.rmdir("./tree" + path) def symlink(self, path, path1): os.symlink(path, "./tree" + path1) def rename(self, path, path1): os.rename("./tree" + path, "./tree" + path1) def link(self, path, path1): os.link("./tree" + path, "./tree" + path1) def chmod(self, path, mode): os.chmod("./tree" + path, mode) def chown(self, path, user, group): os.chown("./tree" + path, user, group) def truncate(self, path, len): # Truncate using the ftruncate on the file dmsg( 2, "Using FuseArchiveFile to truncate " + path + " to " + str(len) ) f = self.FuseArchiveFile( path, os.O_APPEND, 0 ) f.ftruncate(len) f.release( 0 ) def mknod(self, path, mode, dev): os.mknod("./tree" + path, mode, dev) def mkdir(self, path, mode): os.mkdir("./tree" + path, mode) def utime(self, path, times): os.utime("./tree" + path, times) # The following utimens method would do the same as the above utime method. # We can't make it better though as the Python stdlib doesn't know of # subsecond preciseness in acces/modify times. # # def utimens(self, path, ts_acc, ts_mod): # os.utime("." + path, (ts_acc.tv_sec, ts_mod.tv_sec)) def access(self, path, mode): if not os.access("./tree" + path, mode): return -EACCES # This is how we could add stub extended attribute handlers... # (We can't have ones which aptly delegate requests to the underlying fs # because Python lacks a standard xattr interface.) # # def getxattr(self, path, name, size): # val = name.swapcase() + '@' + path # if size == 0: # # We are asked for size of the value. # return len(val) # return val # # def listxattr(self, path, size): # # We use the "user" namespace to please XFS utils # aa = ["user." + a for a in ("foo", "bar")] # if size == 0: # # We are asked for size of the attr list, ie. joint size of attrs # # plus null separators. # return len("".join(aa)) + len(aa) # return aa def statfs(self): """ Should return an object with statvfs attributes (f_bsize, f_frsize...). Eg., the return value of os.statvfs() is such a thing (since py 2.2). If you are not reusing an existing statvfs object, start with fuse.StatVFS(), and define the attributes. To provide usable information (ie., you want sensible df(1) output, you are suggested to specify the following attributes: - f_bsize - preferred size of file blocks, in bytes - f_frsize - fundamental size of file blcoks, in bytes [if you have no idea, use the same as blocksize] - f_blocks - total number of blocks in the filesystem - f_bfree - number of free blocks - f_files - total number of file inodes - f_ffree - nunber of free file inodes """ return os.statvfs(".") def fsinit(self): os.chdir(self.root) class FuseArchiveFile(object): def __init__(self, path, flags, *mode): # Inflate the file dmsg( 1, "Init file: " + path ) self.orig_path = path # init rw and offset self.offset = 0 self.rd = False self.wr = False self.size = 0 self.modified = False # This is the current in-memory chunk and offset in to data[] self.chunk_cache = {}; self.chunk = '' self.chunk_index = -1 self.chunk_modified = False self.chunk_size = magic_blocksize self.dirty_chunks = 0 # The chunk table self.chunks = [] # TODO: Better flag handling here? if flags | os.O_RDONLY: self.rd = True if flags | os.O_RDWR: self.rd = True self.wr = True if flags | os.O_WRONLY: self.wr = True if flags | os.O_APPEND: self.wr = True # TODO: handle offset -1 self.offset = -1 if os.path.exists( "./tree" + self.orig_path ): # Read in file info table src = "./tree" + self.orig_path dmsg( 3, "Unpickling: " + src ) # TODO: return an IO error if inflating fails inp = gzip.open( src, "rb" ) magic = pickle.load( inp ) inp.close() dmsg( 3, "Got data: " + str( magic ) ) self.size = magic[ 'size' ] self.chunks = magic[ 'chunks' ] self.chunk_size = magic[ 'chunk_size' ] else: if self.wr: dmsg( 2, "File doesn't exist and we're going to write, creating temp empty file" ) self.modified = True self.flush() self.direct_io = False self.keep_cache = False #pdb.set_trace() dmsg( 3, str(self) + " init complete" ) def _load_chunk( self, index ): # If the current chunk is the same as the chunk we're loading # just return if index == self.chunk_index: dmsg( 3, "Load chunk is same as current chunk, all done" ) return # Save this chunk if modified self._save_chunk() dmsg( 3, "Loading chunk " + str(index) ) key = None size = len( self.chunks ) if index >= size: dmsg( 3, "Index doesn't exist" ) else: key = self.chunks[ index ] if key: if isinstance( key, str ): dmsg( 3, "Found cached dirty page" ) self.chunk = key else: dmsg( 3, "Index: " + str( key ) ) self.chunk = load_chunk( key ) else: dmsg( 3, "No chunk at this index, loading nothing" ) self.chunk = '' self.chunk_index = index self.chunk_modified = False # This simply puts the chunk data inside our current chunks at chunk_index def _save_chunk(self): if self.chunk_modified: # Make sure we have room for this chunk size = len( self.chunks ) if self.chunk_index >= size: self.chunks.extend( [ '' ] * ( self.chunk_index -size + 1 ) ) # Increment dirty chunks if we had a key here already if isinstance( self.chunks[ self.chunk_index ], list ) or \ len( self.chunks[ self.chunk_index ] ) == 0: self.dirty_chunks += 1 dmsg( 3, "Dirty chunks is now: " + str( self.dirty_chunks ) ) dmsg( 3, "Dirty flush at: " + str( dirty_flush ) ) # Save the dirty chunk temporarily in memory self.chunks[ self.chunk_index ] = self.chunk # Flush if we have too many dirty chunks if self.dirty_chunks > dirty_flush: self._flush_chunks() # This flushes any cached chunks def _flush_chunks(self): for index in range( len( self.chunks ) ): if isinstance( self.chunks[ index ], str ): dmsg( 3, "Flushing chunk at " + str( index ) ) key = save_chunk( self.chunks[ index ] ) self.chunks[ index ] = key dmsg( 3, "Key was " + str( key ) ) self.dirty_chunks = 0 def read(self, length, offset): dmsg( 3, "Reading from " + self.orig_path + " offset: " + str( offset ) + " length: " + str( length ) ) data_read = 0 data = '' index = int( offset / self.chunk_size ) rest = offset % self.chunk_size is_eof = False # Keep reading chunks until we have at least this much data while data_read < length and not is_eof: dmsg( 3, "Pulling chunk data" ) self._load_chunk( index ) if len(self.chunk): chunk_remaining = len(self.chunk) - rest to_read = chunk_remaining data_left = length - data_read if data_left < chunk_remaining: to_read = data_left dmsg( 3, "Copying " + str(to_read) + " bytes" ) data += self.chunk[ rest:chunk_remaining ] data_read += to_read index += 1 rest = 0 else: dmsg( 3, "No more chunk data, bye" ) is_eof = True return data def write(self, buf, offset): dmsg( 3, "Writing to " + self.orig_path + " offset: " + str( offset ) ) index = int( offset / self.chunk_size ) rest = offset % self.chunk_size buf_offset = 0 buf_len = len(buf) dmsg( 3, "Length: " + str( buf_len ) ) while( buf_offset < buf_len ): dmsg( 3, "Pulling in chunk for writing" ) self._load_chunk( index ) buf_remain = buf_len - buf_offset chunk_remain = self.chunk_size - rest if chunk_remain < buf_remain: dmsg( 3, "Writing " + str( chunk_remain ) + " bytes, buffer boundry" ) this_len = chunk_remain else: dmsg( 3, "Writing final " + str( buf_remain ) + " bytes" ) this_len = buf_remain # Since python doesn't do in-place reassignment like you # can with splice() we will reconstruct the data by joining # stuff by offsets (first chars to skip, then our joining # buf chunk, the everything that would have been after it) self.chunk = self.chunk[ :rest ] + \ buf[ buf_offset:this_len ] + \ self.chunk[ (rest + this_len): ] buf_offset += this_len # Advance to next block rest = 0 index += 1 self.chunk_modified = True self.modified = True return len(buf) # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show # up right? like wrong size and stuff? # Maybe because release doesn't return a fuse message and is async? def release(self, flags): # Deflate the file dmsg( 2, "Release: " + self.orig_path ) self.flush() def _fflush(self): if self.wr and self.modified: dmsg( 3, "_fflush!" ) # Save our main data self._save_chunk() # And flush any cached chunks self._flush_chunks() # Figure out our size based on the number of chunks + the # len of the final chunk numchunks = len( self.chunks ) if numchunks > 0: # Load the last chunk dmsg( 3, "We have " + str(numchunks) + " chunks, calculating size" ) self._load_chunk( numchunks - 1 ) self.size = ( numchunks - 1 ) * self.chunk_size + \ len( self.chunk ) else: dmsg( 3, "No chunks, setting size to zero" ) self.size = 0 dmsg( 3, "Size calculated is: " + str( self.size ) ) out = gzip.open( "./tree" + self.orig_path, "wb", gzip_compress_level ) pickle.dump( { 'size': self.size, 'chunks': self.chunks, 'chunk_size': self.chunk_size }, out ) out.close() return 1 # Currently we treat fsync as flush since we don't keep any data # hanging around anyway in fh stuff def fsync(self, isfsyncfile): dmsg( 2, "fsync " + self.orig_path ) self._fflush() #if isfsyncfile and hasattr(os, 'fdatasync'): # os.fdatasync(self.fd) #else: # os.fsync(self.fd) def flush(self): dmsg( 2, "flush " + self.orig_path ) self._fflush() def fgetattr(self): dmsg( 3, "Overridding fgetattr" ) stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) ) if self.modified: # We would need to fsync here to recalc size, but don't do # it unless modified? otherwise simple getattr will be # rewriting a ton of files print "WARNING: self.modified causes fgetattr to be incorrect!" stats.overstat( self.size ) return stats def ftruncate(self, len): if len > 0: print "WARNING: ftruncate is broken for non-zero len!!!" self.chunks = [] self.modified = True self._load_chunk( 0 ) self._fflush() def lock(self, cmd, owner, **kw): dmsg( 3, "WARNING: locking unsupported" ) return 1 # The code here is much rather just a demonstration of the locking # API than something which actually was seen to be useful. # Advisory file locking is pretty messy in Unix, and the Python # interface to this doesn't make it better. # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent # way. The following implementation *might* work under Linux. # # if cmd == fcntl.F_GETLK: # import struct # # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET, # kw['l_start'], kw['l_len'], kw['l_pid']) # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata) # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid') # uld2 = struct.unpack('hhQQi', ld2) # res = {} # for i in xrange(len(uld2)): # res[flockfields[i]] = uld2[i] # # return fuse.Flock(**res) # Convert fcntl-ish lock parameters to Python's weird # lockf(3)/flock(2) medley locking API... op = { fcntl.F_UNLCK : fcntl.LOCK_UN, fcntl.F_RDLCK : fcntl.LOCK_SH, fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']] if cmd == fcntl.F_GETLK: return -EOPNOTSUPP elif cmd == fcntl.F_SETLK: if op != fcntl.LOCK_UN: op |= fcntl.LOCK_NB elif cmd == fcntl.F_SETLKW: pass else: return -EINVAL fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len']) def main(self, *a, **kw): self.file_class = self.FuseArchiveFile # This is where fragments go if not os.path.exists( 'storage' ): os.mkdir( 'storage' ) # This is where the real files exist if not os.path.exists( 'tree' ): os.mkdir( 'tree' ) return Fuse.main(self, *a, **kw) def main(): usage = """ Userspace nullfs-alike: mirror the filesystem tree from some point on. """ + Fuse.fusage server = FuseArchive(version="%prog " + fuse.__version__, usage=usage, dash_s_do='setsingle') server.multithreaded = False server.parse(values=server, errex=1) if len(server.parser.largs) != 2: print "Usage: " + sys.argv[0] + " storageDirectory mountDirectory" sys.exit(1) server.root = server.parser.largs[0] try: if server.fuse_args.mount_expected(): os.chdir(server.root) except OSError: print >> sys.stderr, "can't enter root of underlying filesystem" sys.exit(1) server.main() if __name__ == '__main__': # Import Psyco if available # doesn't seem to make a difference, must not be on this end #try: # import psyco # psyco.full() #except ImportError: # pass if False: import hotshot prof = hotshot.Profile( "fusearchive_stats" ) prof.runcall(main) prof.close() else: main()