aboutsummaryrefslogtreecommitdiffstats
path: root/FuseArchive/ChunkFile.py
diff options
context:
space:
mode:
Diffstat (limited to 'FuseArchive/ChunkFile.py')
-rw-r--r--FuseArchive/ChunkFile.py536
1 files changed, 536 insertions, 0 deletions
diff --git a/FuseArchive/ChunkFile.py b/FuseArchive/ChunkFile.py
new file mode 100644
index 0000000..546e9c0
--- /dev/null
+++ b/FuseArchive/ChunkFile.py
@@ -0,0 +1,536 @@
+import logging, os, errno, fcntl, fuse, FuseArchive
+from binascii import hexlify
+from FuseArchive.Serializer import Serializer
+
+# These control some of the file output
+magic_blocksize = 1024 * 128
+# Use a tiny block size to debug writes, so you can use a smaller test file
+#magic_blocksize = 1024
+magic_depth = 5
+gzip_compress_level = 6
+chunkstyle = 'fs'
+
+# Memory for dirty blocks, per file (1M)
+dirty_size = 1024 * 1024 * 1;
+# This is the number of actualy blocks in that size
+dirty_flush = int( dirty_size / magic_blocksize )
+
+# This is a cache of open files by inode, to fix the lseek == size problem
+# this causes a failure in fsx-linux becuase to to lseek(fd,0,seek_end) it
+# apparently does a getattr to find the file length then subtracts the
+# offset from that to pass to write or whatever, since the offset is passed
+# to write and we don't maintain one internally. xmp.py also fails this
+# test.
+dirty_cache = {}
+
+def flag2mode(flags):
+ md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
+ m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
+
+ if flags & os.O_APPEND:
+ m = m.replace('w', 'a', 1)
+
+ return m
+
+class FuseArchiveStat(fuse.Stat):
+ def __init__(self, stat):
+ self.st_mode = stat.st_mode
+ self.st_ino = stat.st_ino
+ self.st_dev = stat.st_dev
+ self.st_rdev = stat.st_rdev
+ self.st_nlink = stat.st_nlink
+ self.st_uid = stat.st_uid
+ self.st_gid = stat.st_gid
+ self.st_size = stat.st_size
+ self.st_atime = stat.st_atime
+ self.st_mtime = stat.st_mtime
+ self.st_ctime = stat.st_mtime
+ self.st_blocks = stat.st_blocks
+ self.st_blksize = stat.st_blksize
+
+ def overstat( self, size ):
+ self.st_size = size
+ # Yeah we shouldn't always just add 1
+ self.st_blocks = int( self.st_size / 512 ) + 1
+
+class ChunkFile(object):
+
+ def __init__(self, path, flags, *mode):
+ # Inflate the file
+ logging.debug( "Init file: " + path )
+ self.orig_path = path
+
+ # init rw and offset
+ self.rd = False
+ self.wr = False
+ self.size = 0
+ self.modified = False
+
+ # This is the current in-memory chunk and offset in to data[]
+ self.chunk_cache = {};
+ self.chunk = ''
+ self.chunk_index = -1
+ self.chunk_modified = False
+ self.chunk_size = magic_blocksize
+ self.dirty_chunks = 0
+
+ # The chunk table
+ self.chunks = []
+
+ # TODO: Better flag handling here?
+ if flags & os.O_RDONLY:
+ self.rd = True
+
+ if flags & os.O_RDWR:
+ self.rd = True
+ self.wr = True
+
+ if flags & os.O_WRONLY:
+ self.wr = True
+
+ if flags & os.O_APPEND:
+ self.wr = True
+
+ if os.path.exists( "./tree" + self.orig_path ):
+ preexist = True
+ else:
+ preexist = False
+
+ # Open the file now and keep the fh around so that cp -a on r/o
+ # files works (in the create a read-only file for writing case)
+ src = "./tree" + path
+ logging.debug( "Saving fh for " + src )
+ nflags = os.O_RDWR | os.O_APPEND
+ if flags & os.O_CREAT:
+ logging.debug( "Adding O_CREAT" )
+ nflags = nflags | os.O_CREAT
+
+ self.file = os.fdopen( os.open( src, nflags, *mode ),
+ flag2mode( nflags ) )
+
+ if preexist:
+ # Read in file info table
+ logging.debug( "Unpickling: %s" % self.file )
+ # TODO: return an IO error if inflating fails
+ try:
+ magic = Serializer.loadfh( self.file )
+ logging.debug( "Got data: %s" % magic )
+ self.size = magic[ 'size' ]
+ self.chunks = magic[ 'chunks' ]
+ self.chunk_size = magic[ 'chunk_size' ]
+ except Exception, e:
+ logging.critical( self.orig_path + ": " + str( e ) )
+ else:
+ if self.wr:
+ logging.debug( "File doesn't exist and we're going to write, creating temp empty file" )
+ self.modified = True
+ self.flush()
+
+ self.direct_io = False
+ self.keep_cache = False
+
+ logging.debug( "%s init complete" % self )
+
+ def _load_chunk( self, index ):
+ # If the current chunk is the same as the chunk we're loading
+ # just return
+ logging.debug( "_load_chunk: %d" % index )
+
+ if index == self.chunk_index:
+ logging.debug( "Load chunk is same as current chunk, all done" )
+ return
+
+ # Save this chunk if modified
+ self._save_chunk()
+
+ logging.debug( "Loading chunk %d" % index )
+ key = None
+
+ size = len( self.chunks )
+ if index >= size:
+ logging.debug( "Index doesn't exist" )
+ else:
+ key = self.chunks[ index ]
+
+ if key:
+ if isinstance( key, str ):
+ logging.debug( "Found cached dirty page" )
+ self.chunk = key
+ else:
+ logging.debug( "Index: %s" % key )
+ self.chunk = load_chunk( key )
+ else:
+ logging.debug( "No chunk at this index, loading nothing" )
+ self.chunk = ''
+
+ logging.debug( "Loaded chunk of length: %d" % len( self.chunk ) )
+
+ self.chunk_index = index
+ self.chunk_modified = False
+
+ # This simply puts the chunk data inside our current chunks at chunk_index
+ def _save_chunk(self):
+ if self.chunk_modified:
+ logging.debug( "Saving chunk %d" % self.chunk_index )
+
+ # Make sure we have room for this chunk
+ size = len( self.chunks )
+ if self.chunk_index >= size:
+ self.chunks.extend( [ '' ] * ( self.chunk_index -size + 1 ) )
+
+ # Increment dirty chunks if we had a key here already
+ if isinstance( self.chunks[ self.chunk_index ], list ) or \
+ len( self.chunks[ self.chunk_index ] ) == 0:
+ self.dirty_chunks += 1
+ logging.debug( "Dirty chunks is now: %d" % self.dirty_chunks )
+ logging.debug( "Dirty flush at: %d" % dirty_flush )
+
+ # Save the dirty chunk temporarily in memory
+ self.chunks[ self.chunk_index ] = self.chunk
+
+ # Flush if we have too many dirty chunks
+ if self.dirty_chunks > dirty_flush:
+ self._flush_chunks()
+
+ # This flushes any cached chunks
+ def _flush_chunks(self):
+ for index in range( len( self.chunks ) ):
+ if isinstance( self.chunks[ index ], str ):
+ logging.debug( "Flushing chunk at %d" % index )
+ key = save_chunk( self.chunks[ index ] )
+ self.chunks[ index ] = key
+ logging.debug( "Key was %s" % key )
+ self.dirty_chunks = 0
+
+ def read(self, length, offset):
+ logging.debug( "Reading from %s offset: %d (0x%x) length: %d (0x%d)" %
+ ( self.orig_path, offset, offset, length, length ) )
+
+ data_read = 0
+ data = ''
+ index = int( offset / self.chunk_size )
+ rest = offset % self.chunk_size
+ is_eof = False
+
+ # Keep reading chunks until we have at least this much data
+ while data_read < length and not is_eof:
+ logging.debug( "Pulling chunk data: %d" % index )
+ self._load_chunk( index )
+ if len(self.chunk):
+ chunk_remaining = len(self.chunk) - rest
+ to_read = chunk_remaining
+ data_left = length - data_read
+ if data_left < chunk_remaining:
+ to_read = data_left
+
+ logging.debug( "chunk_remaining: %d" % chunk_remaining )
+ logging.debug( "data_left: %d" % data_left )
+ logging.debug( "data_read: %d" % data_read )
+ logging.debug( "rest: %d" % rest )
+ logging.debug( "Copying %d bytes" % to_read )
+
+ data += self.chunk[ rest:(rest+to_read) ]
+ data_read += to_read
+ index += 1
+ rest = 0
+ else:
+ logging.debug( "No more chunk data, bye" )
+ is_eof = True
+
+ logging.debug( "Returning %d bytes of data" % len( data ) )
+ logging.debug( "Internal count was: %d" % data_read )
+ return data
+
+ def write(self, buf, offset):
+ if magic_profiling:
+ return len( buf )
+
+ logging.debug( "Writing to %s offset: %d (0x%x) length: %d (0x%x)" %
+ ( self.orig_path, offset, offset, len( buf ), len( buf ) ) )
+
+ index = int( offset / self.chunk_size )
+ rest = offset % self.chunk_size
+
+ logging.debug( "This chunk falls on index: %d rest: %d" % ( index, rest ) )
+ logging.debug( "We have %d chunks" % len( self.chunks ) )
+ logging.debug( "File size is: %d" % self.size )
+
+ # If index is higher than the number of blocks we current have it's a seek hole, so we need to extend our blocks out
+ # We know these need to essentially be zeroed up to this size since
+ if len( self.chunks ) - 1 < index:
+ logging.debug( "Not enough chunks %d, need %d, extending" %
+ ( len( self.chunks ), index + 1 ) )
+ this_index = 0
+ while this_index < index:
+ self._load_chunk( this_index )
+ fill_null = self.chunk_size - len(self.chunk)
+ logging.debug( "Filling this chunk with null, bytes: %d" % fill_null )
+ self.chunk += "\0" * fill_null
+ logging.debug( "Chunk is now: %d bytes" % len( self.chunk) )
+ self.chunk_modified = True
+ self._save_chunk()
+ this_index += 1
+
+ self._load_chunk( index )
+
+ # Now check if this chunk needs to be extended
+ if len( self.chunk ) < rest:
+ fill_null = rest - len(self.chunk)
+ logging.debug( "Filling final chunk with null, bytes: %d" % fill_null )
+ self.chunk += "\0" * fill_null
+ self.chunk_modified = True
+ self._save_chunk()
+
+ buf_offset = 0
+ buf_len = len(buf)
+
+ logging.debug( "Length: %d" % buf_len )
+ while( buf_offset < buf_len ):
+ logging.debug( "Pulling in chunk for writing: %d" % index )
+ self._load_chunk( index )
+ buf_remain = buf_len - buf_offset
+ chunk_remain = self.chunk_size - rest
+
+ logging.debug( "buf_remain: %d" % buf_remain )
+ logging.debug( "chunk_remain: %d" % chunk_remain )
+
+ if chunk_remain < buf_remain:
+ logging.debug( "Writing %d bytes, buffer boundry" % chunk_remain )
+ this_len = chunk_remain
+ else:
+ logging.debug( "Writing final %d bytes" % buf_remain )
+ this_len = buf_remain
+
+ logging.debug( "Bytes to copy: %d" % this_len )
+ logging.debug( " buf offset: %d" % buf_offset )
+ logging.debug( " chunk offset: %d" % rest )
+
+ if FuseArchive.deep_debug:
+ logging.debug( "Pre-Buf: %s" % hexlify(buf) )
+ logging.debug( "Pre-Chunk: %s" % hexlify(self.chunk) )
+
+ # Since python doesn't do in-place reassignment like you
+ # can with splice() we will reconstruct the data by joining
+ # stuff by offsets (first chars to skip, then our joining
+ # buf chunk, the everything that would have been after it)
+ self.chunk = self.chunk[ :rest ] + \
+ buf[ buf_offset:(buf_offset+this_len) ] + \
+ self.chunk[ (rest + this_len): ]
+
+ if FuseArchive.deep_debug:
+ logging.debug( "Post-Buf: %s" % hexlify(buf) )
+ logging.debug( "Post-Chunk: %s" % hexlify(self.chunk) )
+
+ buf_offset += this_len
+
+ # Advance to next block
+ rest = 0
+ index += 1
+ self.chunk_modified = True
+
+ self._save_chunk()
+ self.modified = True
+ if offset + len(buf) > self.size:
+ self.size = offset + len(buf)
+
+ logging.debug( "This chunk size is now: %d" % len( self.chunk ) )
+ logging.debug( "File size is now: %d" % self.size )
+ logging.debug( "Num Chunks: %d" % len( self.chunks ) )
+
+ # Mark us in the dirty cache
+ dirty_cache[ self.orig_path ] = self
+
+ return len(buf)
+
+ # BUG: If you cp -a a file then quickly ls -l sometimes it doesn't show
+ # up right? like wrong size and stuff?
+ # Maybe because release doesn't return a fuse message and is async?
+ def release(self, flags):
+ # Deflate the file
+ logging.debug( "Release: " + self.orig_path )
+ self.flush()
+ self.file.close()
+
+ def _fflush(self):
+ if self.wr and self.modified:
+ logging.debug( "_fflush!" )
+ # Save our main data
+ self._save_chunk()
+
+ # And flush any cached chunks
+ self._flush_chunks()
+
+ save_size = self.size
+
+ # Figure out our size based on the number of chunks + the
+ # len of the final chunk
+ numchunks = len( self.chunks )
+ if numchunks > 0:
+ # Load the last chunk
+ logging.debug( "We have %d chunks, calculating size" % numchunks )
+ self._load_chunk( numchunks - 1 )
+ self.size = ( numchunks - 1 ) * self.chunk_size + \
+ len( self.chunk )
+ else:
+ logging.debug( "No chunks, setting size to zero" )
+ self.size = 0
+
+ # If this assert fails then write/ftruncate failed to set
+ # things up right somewhere
+ assert save_size == self.size, "Calculated size of " \
+ + self.orig_path + " = " + str( self.size ) \
+ + " doesn't match internal size " + str( save_size ) \
+ + "\nProbably a bug in write or ftruncate!"
+ logging.debug( "Size calculated is: %d (0x%x)" % ( self.size, self.size ) )
+
+ Serializer.dumpfh( self.file, {
+ 'size': self.size,
+ 'chunks': self.chunks,
+ 'chunk_size': self.chunk_size
+ } )
+
+ # Not dirty anymore
+ if self.orig_path in dirty_cache:
+ del dirty_cache[ self.orig_path ]
+
+
+ logging.debug( "_fflush exit" )
+ return 1
+
+
+ # Currently we treat fsync as flush since we don't keep any data
+ # hanging around anyway in fh stuff
+ def fsync(self, isfsyncfile):
+ logging.debug( "fsync " + self.orig_path )
+ self._fflush()
+ #if isfsyncfile and hasattr(os, 'fdatasync'):
+ # os.fdatasync(self.fd)
+ #else:
+ # os.fsync(self.fd)
+
+ def flush(self):
+ logging.debug( "flush " + self.orig_path )
+ self._fflush()
+
+ def fgetattr(self):
+ logging.debug( "Overridding fgetattr" )
+ stats = FuseArchiveStat( os.lstat( "./tree" + self.orig_path ) )
+
+ # Fixed in write?
+ #if self.modified:
+ # We would need to fsync here to recalc size, but don't do
+ # it unless modified? otherwise simple getattr will be
+ # rewriting a ton of files
+ # print "WARNING: self.modified causes fgetattr to be incorrect!"
+
+ stats.overstat( self.size )
+ return stats
+
+ def ftruncate(self, length):
+ if not self.wr:
+ return errno.IOError
+
+ curr_chunks = len( self.chunks )
+ need_chunks = ( length / self.chunk_size )
+ extra_bytes = length % self.chunk_size
+ logging.debug( "Ftruncate - %d (0x%x)" % ( length, length ) )
+ logging.debug( " - self.size: %d" % self.size )
+ logging.debug( " - curr_chunks: %d" % curr_chunks )
+ logging.debug( " - need_chunks: %d" % need_chunks )
+ logging.debug( " - extra_bytes: %d" % extra_bytes )
+
+ if extra_bytes:
+ logging.debug( "Need an extra chunk" )
+ need_chunks += 1
+
+ self._load_chunk( 0 )
+
+ if length == 0:
+ logging.debug( "Creating 0 chunk file" )
+ self.chunks = []
+ self.chunk = ''
+ elif self.size <= length:
+ logging.debug( "Need to pad out file, writing/seeking to %d" % length )
+
+ # Just write out null bytes to the length requested, write will do this for us if we specify the offset
+ self.write( '', length )
+ else:
+ logging.debug( "Truncating chunks" )
+ while True:
+ logging.debug( "Need chunks: %d curr: %d" % ( need_chunks, curr_chunks ) )
+ if need_chunks == curr_chunks:
+ break
+
+ logging.debug( "Deleting chunk %d" % self.chunk_index )
+ self.chunks.pop()
+ curr_chunks = len( self.chunks )
+
+ # Now make sure this chunk is the right size, first load the
+ # last chunk
+ if len( self.chunks ):
+ self._load_chunk( len( self.chunks ) - 1 )
+ logging.debug( "Loaded final chunk, len: %d" % len( self.chunk ) )
+
+ # Now truncate this item if needed
+ if len( self.chunk ) > extra_bytes:
+ logging.debug( "Truncating final chunk to %d" % extra_bytes )
+ self.chunk = self.chunk[ :extra_bytes ]
+ logging.debug( "Chunk is now: %d bytes" % len( self.chunk ) )
+
+ self.chunk_modified = True
+ self.modified = True
+ self.size = length
+ self._load_chunk( 0 )
+
+ logging.debug( "ftruncate complete" )
+ self._fflush()
+
+ def lock(self, cmd, owner, **kw):
+ logging.debug( "WARNING: locking unsupported" )
+ return 1
+
+ # The code here is much rather just a demonstration of the locking
+ # API than something which actually was seen to be useful.
+
+ # Advisory file locking is pretty messy in Unix, and the Python
+ # interface to this doesn't make it better.
+ # We can't do fcntl(2)/F_GETLK from Python in a platfrom independent
+ # way. The following implementation *might* work under Linux.
+ #
+ # if cmd == fcntl.F_GETLK:
+ # import struct
+ #
+ # lockdata = struct.pack('hhQQi', kw['l_type'], os.SEEK_SET,
+ # kw['l_start'], kw['l_len'], kw['l_pid'])
+ # ld2 = fcntl.fcntl(self.fd, fcntl.F_GETLK, lockdata)
+ # flockfields = ('l_type', 'l_whence', 'l_start', 'l_len', 'l_pid')
+ # uld2 = struct.unpack('hhQQi', ld2)
+ # res = {}
+ # for i in xrange(len(uld2)):
+ # res[flockfields[i]] = uld2[i]
+ #
+ # return fuse.Flock(**res)
+
+ # Convert fcntl-ish lock parameters to Python's weird
+ # lockf(3)/flock(2) medley locking API...
+ op = { fcntl.F_UNLCK : fcntl.LOCK_UN,
+ fcntl.F_RDLCK : fcntl.LOCK_SH,
+ fcntl.F_WRLCK : fcntl.LOCK_EX }[kw['l_type']]
+ if cmd == fcntl.F_GETLK:
+ return -errno.EOPNOTSUPP
+ elif cmd == fcntl.F_SETLK:
+ if op != fcntl.LOCK_UN:
+ op |= fcntl.LOCK_NB
+ elif cmd == fcntl.F_SETLKW:
+ pass
+ else:
+ return -errno.EINVAL
+
+ fcntl.lockf(self.fd, op, kw['l_start'], kw['l_len'])
+
+ @staticmethod
+ def get_dirty_file( path ):
+ if path in dirty_cache:
+ return( dirty_cache[ path ] )
+ else:
+ return None