import logging, os, errno, fcntl, fuse, FuseArchive, copy import FuseArchive.Storage.ZipFile, FuseArchive.Storage.FileSystem from binascii import hexlify from FuseArchive.Serializer import Serializer from ChunkBuffer import ChunkBuffer # These control some of the file output magic_blocksize = 1024 * 1024 * 5 # Use a tiny block size to debug writes, so you can use a smaller test file #magic_blocksize = 1024 chunkstyle = 'fs' # This is the number of actualy blocks in that size dirty_flush = 5 # This is a cache of open files by inode, to fix the lseek == size problem # this causes a failure in fsx-linux becuase to to lseek(fd,0,seek_end) it # apparently does a getattr to find the file length then subtracts the # offset from that to pass to write or whatever, since the offset is passed # to write and we don't maintain one internally. xmp.py also fails this # test. dirty_cache = {} def flag2mode(flags): md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'} m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)] if flags & os.O_APPEND: m = m.replace('w', 'a', 1) return m if chunkstyle == 'fs': load_chunk = FuseArchive.Storage.FileSystem.load_chunk save_chunk = FuseArchive.Storage.FileSystem.save_chunk lock_chunk = FuseArchive.Storage.FileSystem.lock_chunk unlock_chunk = FuseArchive.Storage.FileSystem.unlock_chunk elif chunkstyle == 'zip': raise ValueException( "Zip storage doesn't support lock/unlock, make an inteface!" ) load_chunk = FuseArchive.Storage.ZipFile.load_chunk save_chunk = FuseArchive.Storage.ZipFile.save_chunk else: raise ValueException( 'Invalid chunk style' ) class FuseArchiveStat(fuse.Stat): def __init__(self, stat): self.st_mode = stat.st_mode self.st_ino = stat.st_ino self.st_dev = stat.st_dev self.st_rdev = stat.st_rdev self.st_nlink = stat.st_nlink self.st_uid = stat.st_uid self.st_gid = stat.st_gid self.st_size = stat.st_size self.st_atime = stat.st_atime self.st_mtime = stat.st_mtime self.st_ctime = stat.st_mtime self.st_blocks = stat.st_blocks self.st_blksize = stat.st_blksize def overstat( self, size ): self.st_size = size # Yeah we shouldn't always just add 1 self.st_blocks = int( self.st_size / 512 ) + 1 class ChunkFile(object): def __init__(self, path, flags, *mode): # Inflate the file logging.debug( "Init file: " + path ) self.orig_path = path # init rw and offset self.rd = False self.wr = False self.size = 0 self.modified = False # This is the current in-memory chunk and offset in to data[] self.chunk_cache = {} self.chunk = ChunkBuffer() self.chunk_index = -1 self.chunk_modified = False self.chunk_size = magic_blocksize self.dirty_chunks = 0 # The chunk table self.chunks = [] # Because python is bizarre and you magically define attributes in # the constructor we predefine this here for the case where we # fflush early if we're creating a new file since we reference this # attribute in the routine. At least it gets initialized I guess self.original_chunks = [] self.original_key = None # TODO: Better flag handling here? if flags & os.O_RDONLY: self.rd = True if flags & os.O_RDWR: self.rd = True self.wr = True if flags & os.O_WRONLY: self.wr = True if flags & os.O_APPEND: self.wr = True if os.path.exists( "./tree" + self.orig_path ): preexist = True else: preexist = False # Open the file now and keep the fh around so that cp -a on r/o # files works (in the create a read-only file for writing case) src = "./tree" + path logging.debug( "Saving fh for " + src ) if flags == 0: # o_rdonly nflags = flags else: # We need rw to cp -a on r/o files, actually we need rw on # almost any write only operation to read out our chunks list nflags = os.O_RDWR | os.O_APPEND if flags & os.O_CREAT: logging.debug( "Adding O_CREAT" ) nflags = nflags | os.O_CREAT logging.debug( "Flags & O_RDONLY %d" % (flags & os.O_RDONLY) ); logging.debug( "Flags & O_RDWR %d" % (flags & os.O_RDWR) ); logging.debug( "Flags & O_WRONLY %d" % (flags & os.O_WRONLY) ); logging.debug( "Flags & O_APPEND %d" % (flags & os.O_APPEND) ); logging.debug( "Flags & O_CREAT %d" % (flags & os.O_CREAT) ); self.file = os.fdopen( os.open( src, nflags, *mode ), flag2mode( nflags ) ) if preexist: # Read in file info table logging.debug( "Unpickling: %s" % self.file ) # TODO: return an IO error if inflating fails try: magic = Serializer.loadfh( self.file ) logging.debug( "Got data: %s" % magic ) # This is just a key to a block to minimize complete # duplicates logging.debug( "Reading chunk to get actual file data" ) self.original_key = magic file_chunk = load_chunk( magic ) magic = Serializer.loads( file_chunk ) self.size = magic[ 'size' ] self.chunks = magic[ 'chunks' ] self.chunk_size = magic[ 'chunk_size' ] logging.debug( "Loaded size: %d, chunk size: %d, chunks: %d" % ( self.size, self.chunk_size, len( self.chunks ) ) ) except Exception, e: logging.critical( self.orig_path + ": " + str( e ) ) else: if self.wr: logging.debug( "File doesn't exist and we're going to write, creating temp empty file" ) self.modified = True self.flush() self.direct_io = False self.keep_cache = False self.original_chunks = copy.deepcopy( self.chunks ) logging.debug( "%s init complete" % self ) def _load_chunk( self, index ): # If the current chunk is the same as the chunk we're loading # just return logging.debug( "_load_chunk: %d" % index ) if index == self.chunk_index: logging.debug( "Load chunk is same as current chunk, all done" ) return # Save this chunk if modified self._save_chunk() logging.debug( "Loading chunk %d" % index ) key = None size = len( self.chunks ) if index >= size: logging.debug( "Index doesn't exist" ) else: key = self.chunks[ index ] if key: if isinstance( key, ChunkBuffer ): logging.debug( "Found cached dirty page" ) self.chunk = key else: logging.debug( "Index: %s" % key ) self.chunk = ChunkBuffer( load_chunk( key ) ) else: logging.debug( "No chunk at this index, loading nothing" ) self.chunk = ChunkBuffer() logging.debug( "Loaded chunk of length: %d" % self.chunk.length() ) self.chunk_index = index self.chunk_modified = False # This simply puts the chunk data inside our current chunks at chunk_index def _save_chunk(self): if self.chunk_modified: logging.debug( "Saving chunk %d" % self.chunk_index ) # Make sure we have room for this chunk size = len( self.chunks ) if self.chunk_index >= size: self.chunks.extend( [ ChunkBuffer() ] * ( self.chunk_index -size + 1 ) ) # Increment dirty chunks if we had a key here already logging.debug( "Chunk is: %s" % self.chunks[ self.chunk_index ] ); if isinstance( self.chunks[ self.chunk_index ], list ) or \ self.chunks[ self.chunk_index ].length() == 0: self.dirty_chunks += 1 logging.debug( "Dirty chunks is now: %d" % self.dirty_chunks ) logging.debug( "Dirty flush at: %d" % dirty_flush ) # Save the dirty chunk temporarily in memory self.chunks[ self.chunk_index ] = self.chunk # Flush if we have too many dirty chunks if self.dirty_chunks >= dirty_flush: self._flush_chunks() # This flushes any cached chunks def _flush_chunks(self): for index in range( len( self.chunks ) ): if isinstance( self.chunks[ index ], ChunkBuffer ): logging.debug( "Flushing chunk at %d" % index ) key = save_chunk( self.chunks[ index ].string() ) self.chunks[ index ] = key logging.debug( "Key was %s" % key ) self.dirty_chunks = 0 # If we had an old chunk here, free it if len(self.original_chunks) >= index + 1: oldkey = self.original_chunks[ index ] if oldkey != key: # Free this chunk unlock_chunk( oldkey ) # And keep this chunk lock_chunk( key ) # Else chunk didn't change, don't relock or anything else: # We did not have a chunk here so lock this chunk lock_chunk( key ) # And extend original chunks by 1 (we are walking # sequentially so we don't need to worry about padding # out intermediate chunks) self.original_chunks.extend( [ ChunkBuffer() ] ) # And update the key in original chunks self.original_chunks[ index ] = key #self._update_chunk_references() def read(self, length, offset): logging.debug( "Reading from %s offset: %d (0x%x) length: %d (0x%d)" % ( self.orig_path, offset, offset, length, length ) ) data_read = 0 data = '' index = int( offset / self.chunk_size ) rest = offset % self.chunk_size is_eof = False # Keep reading chunks until we have at least this much data while data_read < length and not is_eof: logging.debug( "Pulling chunk data: %d" % index ) self._load_chunk( index ) if self.chunk.length(): chunk_remaining = self.chunk.length() - rest to_read = chunk_remaining data_left = length - data_read if data_left < chunk_remaining: to_read = data_left logging.debug( "chunk_remaining: %d" % chunk_remaining ) logging.debug( "data_left: %d" % data_left ) logging.debug( "data_read: %d" % data_read ) logging.debug( "rest: %d" % rest ) logging.debug( "Copying %d bytes" % to_read ) data += self.chunk.string()[ rest:(rest+to_read) ] data_read += to_read index += 1 rest = 0 else: logging.debug( "No more chunk data, bye" ) is_eof = True logging.debug( "Returning %d bytes of data" % len( data ) ) logging.debug( "Internal count was: %d" % data_read ) return data def write(self, buf, offset): if FuseArchive.magic_profiling: return len( buf ) logging.debug( "Writing to %s offset: %d (0x%x) length: %d (0x%x)" % ( self.orig_path, offset, offset, len( buf ), len( buf ) ) ) index = int( offset / self.chunk_size ) rest = offset % self.chunk_size logging.debug( "This chunk falls on index: %d rest: %d" % ( index, rest ) ) logging.debug( "We have %d chunks" % len( self.chunks ) ) logging.debug( "File size is: %d" % self.size ) # If index is higher than the number of blocks we current have it's a seek hole, so we need to extend our blocks out # We know these need to essentially be zeroed up to this size since if len( self.chunks ) - 1 < index: logging.debug( "Not enough chunks %d, need %d, extending" % ( len( self.chunks ), index + 1 ) ) # Start with our last block, in case we need to null pad it out this_index = len( self.chunks ) - 1 if this_index < 0: this_index = 0 while this_index < index: self._load_chunk( this_index ) fill_null = self.chunk_size - self.chunk.length() logging.debug( "Filling this chunk with null, bytes: %d" % fill_null ) self.chunk.append( "\0" * fill_null ) logging.debug( "Chunk is now: %d bytes" % self.chunk.length() ) self.chunk_modified = True self._save_chunk() this_index += 1 self._load_chunk( index ) if self.chunk.length() < rest: fill_null = rest - self.chunk.length() logging.debug( "Filling final chunk with null, bytes: %d" % fill_null ) self.chunk.append( "\0" * fill_null ) self.chunk_modified = True self._save_chunk() buf_offset = 0 buf_len = len(buf) logging.debug( "Length: %d" % buf_len ) while( buf_offset < buf_len ): logging.debug( "Pulling in chunk for writing: %d" % index ) self._load_chunk( index ) buf_remain = buf_len - buf_offset chunk_remain = self.chunk_size - rest logging.debug( "buf_remain: %d" % buf_remain ) logging.debug( "chunk_remain: %d" % chunk_remain ) if chunk_remain < buf_remain: logging.debug( "Writing %d bytes, buffer boundry" % chunk_remain ) this_len = chunk_remain else: logging.debug( "Writing final %d bytes" % buf_remain ) this_len = buf_remain logging.debug( "Bytes to copy: %d" % this_len ) logging.debug( " buf offset: %d" % buf_offset ) logging.debug( " chunk offset: %d" % rest ) if FuseArchive.deep_debug: logging.debug( "Pre-Buf: %s" % hexlify(buf) ) logging.debug( "Pre-Chunk: %s" % hexlify(self.chunk) ) # Check if we are appending only if self.chunk.length() == rest and len( buf ) <= this_len: logging.debug( "Doing quick append" ) self.chunk.append( buf ) else: self.chunk.replace( buf[ buf_offset:(buf_offset+this_len) ], rest, rest + this_len ) if FuseArchive.deep_debug: logging.debug( "Post-Buf: %s" % hexlify(buf) ) logging.debug( "Post-Chunk: %s" % hexlify(self.chunk) ) buf_offset += this_len # Advance to next block rest = 0 index += 1 self.chunk_modified = True self._save_chunk() self.modified = True if offset + len(buf) > self.size: self.size = offset + len(buf) logging.debug( "This chunk size is now: %d" % self.chunk.length() ) logging.debug( "File size is now: %d" % self.size ) logging.debug( "Num Chunks: %d" % len( self.chunks ) ) # Mark us in the dirty cache dirty_cache[ self.orig_path ] = self return len(buf) # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show # up right? like wrong size and stuff? # Maybe because release doesn't return a fuse message and is async? def release(self, flags): # Deflate the file logging.debug( "Release: " + self.orig_path ) self.flush() self.file.close() # There is some kind of mem leak, ChunkFile objects end up in a # tuple of [obj,True] for every object written and never get freed, # maybe a fuse bug? Just trying to minimize the damage here by # freeing the big chunks of memory. # # Note that you can see this same problem in xmp.py, if you run it # in pdb then use objgraph to see the top objects XmpFile will be # near the top with this odd reference in a tuple that is owned by # nothing #del self.chunks #del self.chunk #del self.original_chunks self.__dict__ = {} def _fflush(self): if self.wr and self.modified: logging.debug( "_fflush!" ) # Save our main data self._save_chunk() # And flush any cached chunks self._flush_chunks() save_size = self.size # Figure out our size based on the number of chunks + the # len of the final chunk numchunks = len( self.chunks ) if numchunks > 0: # Load the last chunk logging.debug( "We have %d chunks, calculating size" % numchunks ) self._load_chunk( numchunks - 1 ) self.size = ( numchunks - 1 ) * self.chunk_size + \ self.chunk.length() else: logging.debug( "No chunks, setting size to zero" ) self.size = 0 # If this assert fails then write/ftruncate failed to set # things up right somewhere assert save_size == self.size, "Calculated size of " \ + self.orig_path + " = " + str( self.size ) \ + " doesn't match internal size " + str( save_size ) \ + "\nProbably a bug in write or ftruncate!" logging.debug( "Size calculated is: %d (0x%x)" % ( self.size, self.size ) ) key = save_chunk( Serializer.dumps( { 'size': self.size, 'chunks': self.chunks, 'chunk_size': self.chunk_size } ) ) logging.debug( "Saved indirect file to key %s, saving key in main file" % key ) Serializer.dumpfh( self.file, key ) # Update file ref counts if key != self.original_key: logging.debug( "File key changed updating references" ) if self.original_key != None: unlock_chunk( self.original_key ) lock_chunk( key ) self.original_key = key self._update_chunk_references() # Not dirty anymore if self.orig_path in dirty_cache: del dirty_cache[ self.orig_path ] logging.debug( "_fflush exit" ) return 1 def _update_chunk_references(self): # Now update our chunk ref counts logging.debug( "Updating chunk references" ) for index in range( len( self.chunks ) ): # Is this chunk changed from what was here before? oldkey = None key = self.chunks[ index ] changed = False # Is the old chunks at least this big? if index >= len( self.original_chunks ): logging.debug( "No old chunk at this spot, changed for sure" ) changed = True else: oldkey = self.original_chunks[ index ] if oldkey != key: logging.debug( "Key has changed at index %d" % index ) changed = True logging.debug( "%s is now %s" % (oldkey, key) ) if changed: logging.debug( "Chunk at index %d has changed" % index ) if oldkey != None: unlock_chunk( oldkey ) lock_chunk( key ) # Free any unused chunks because of ftruncate logging.debug( "Freeing chunks beyond our new chunk count" ) index = len( self.chunks ) while index < len( self.original_chunks ): logging.debug( "Unlocking chunk at index %d" % index ) unlock_chunk( self.original_chunks[ index ] ) index += 1 # Update our original_chunks since we've locked/unlocked some # things self.original_chunks = copy.deepcopy( self.chunks ) def pre_unlink(self): # Release all our blocks and our main file key logging.debug( "Unlocking all our chunks for unlink" ) for key in self.chunks: unlock_chunk( key ) unlock_chunk( self.original_key ) # Currently we treat fsync as flush since we don't keep any data # hanging around anyway in fh stuff def fsync(self, isfsyncfile): logging.debug( "fsync " + self.orig_path ) self._fflush() #if isfsyncfile and hasattr(os, 'fdatasync'): # os.fdatasync(self.fd) #else: # os.fsync(self.fd) def flush(self): logging.debug( "flush " + self.orig_path ) self._fflush() def fgetattr(self): logging.debug( "Overridding fgetattr" ) stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) ) # Fixed in write? #if self.modified: # We would need to fsync here to recalc size, but don't do # it unless modified? otherwise simple getattr will be # rewriting a ton of files # print "WARNING: self.modified causes fgetattr to be incorrect!" stats.overstat( self.size ) return stats def ftruncate(self, length): if not self.wr: return errno.IOError curr_chunks = len( self.chunks ) need_chunks = ( length / self.chunk_size ) extra_bytes = length % self.chunk_size logging.debug( "Ftruncate - %d (0x%x)" % ( length, length ) ) logging.debug( " - self.size: %d" % self.size ) logging.debug( " - curr_chunks: %d" % curr_chunks ) logging.debug( " - need_chunks: %d" % need_chunks ) logging.debug( " - extra_bytes: %d" % extra_bytes ) if extra_bytes: logging.debug( "Need an extra chunk" ) need_chunks += 1 self._load_chunk( 0 ) if length == 0: logging.debug( "Creating 0 chunk file" ) self.chunks = [] self.chunk = ChunkBuffer() elif self.size <= length: logging.debug( "Need to pad out file, writing/seeking to %d" % length ) # Just write out null bytes to the length requested, write will do this for us if we specify the offset self.write( '', length ) else: logging.debug( "Truncating chunks" ) while True: logging.debug( "Need chunks: %d curr: %d" % ( need_chunks, curr_chunks ) ) if need_chunks == curr_chunks: break logging.debug( "Deleting chunk %d" % self.chunk_index ) self.chunks.pop() curr_chunks = len( self.chunks ) # Now make sure this chunk is the right size, first load the # last chunk if len( self.chunks ): self._load_chunk( len( self.chunks ) - 1 ) logging.debug( "Loaded final chunk, len: %d" % self.chunk.length() ) # Now truncate this item if needed if self.chunk.length() > extra_bytes: logging.debug( "Truncating final chunk to %d" % extra_bytes ) self.chunk.truncate( extra_bytes ) logging.debug( "Chunk is now: %d bytes" % self.chunk.length() ) self.chunk_modified = True self.modified = True self.size = length self._load_chunk( 0 ) logging.debug( "ftruncate complete" ) self._fflush() def lock(self, cmd, owner, **kw): logging.debug( "WARNING: locking unsupported" ) return 1 # The code here is much rather just a demonstration of the locking # API than something which actually was seen to be useful. # Advisory file locking is pretty messy in Unix, and the Python # interface to this doesn't make it better. # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent # way. The following implementation *might* work under Linux. # # if cmd == fcntl.F_GETLK: # import struct # # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET, # kw['l_start'], kw['l_len'], kw['l_pid']) # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata) # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid') # uld2 = struct.unpack('hhQQi', ld2) # res = {} # for i in xrange(len(uld2)): # res[flockfields[i]] = uld2[i] # # return fuse.Flock(**res) # Convert fcntl-ish lock parameters to Python's weird # lockf(3)/flock(2) medley locking API... op = { fcntl.F_UNLCK : fcntl.LOCK_UN, fcntl.F_RDLCK : fcntl.LOCK_SH, fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']] if cmd == fcntl.F_GETLK: return -errno.EOPNOTSUPP elif cmd == fcntl.F_SETLK: if op != fcntl.LOCK_UN: op |= fcntl.LOCK_NB elif cmd == fcntl.F_SETLKW: pass else: return -errno.EINVAL fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len']) @staticmethod def get_dirty_file( path ): if path in dirty_cache: return( dirty_cache[ path ] ) else: return None