Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:15

0001 #!/usr/bin/env python3
0002 
0003 import time
0004 import os
0005 import sys
0006 import zlib
0007 import struct
0008 
0009 DECOMPRESS_BUF_SIZE = 4*1024*1024
0010 
0011 BLOCK_MAGIC = "\x00\x00\xFF\xFF"
0012 ENDLINE_MAGIC = "\n"
0013 
0014 # we treat everything as a deflate stream
0015 # gzip header has no power here
0016 def strip_gzip_header(body):
0017     assert body[0:2] == "\x1f\x8b"
0018     method, flags, mtime = struct.unpack("<BBIxx", body[2:10])
0019 
0020     FHCRC    = 0x02
0021     FEXTRA   = 0x04
0022     FNAME    = 0x08
0023     FCOMMENT = 0x10
0024 
0025     i = 10
0026 
0027     if flags & FEXTRA:
0028         size, = struct.unpack("<H", body[i:i+2])
0029         i += size + 2
0030 
0031     def skip_until_zero(ix):
0032         while body[ix] != '\x00': ix += 1
0033         return ix + 1 
0034     
0035     if flags & FNAME:    i = skip_until_zero(i)
0036     if flags & FCOMMENT: i = skip_until_zero(i)
0037     if flags & FHCRC:    i += 2
0038 
0039     body = body[i:]
0040     return body
0041 
0042 class Decoder(object):
0043     def __init__(self, fname, last_n_lines):
0044         self.f = open(fname, "rb")
0045         self.last_n_lines = last_n_lines
0046         self.reset()
0047 
0048     def reset(self):
0049         self.sync = False
0050         if hasattr(self, 'zstream'):
0051             self.zstream.flush()
0052 
0053         self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
0054 
0055     def decode(self, bytes, if_start=False):
0056         if not bytes:
0057             return ""
0058 
0059         if if_start:
0060             self.sync = True
0061             #self.zstream = zlib.decompressobj(zlib.MAX_WBITS | 32)
0062             bytes = strip_gzip_header(bytes)
0063         elif not self.sync:
0064             x = bytes.find(BLOCK_MAGIC)
0065             if x != -1:
0066                 bytes = bytes[x + len(BLOCK_MAGIC):]
0067                 self.sync = True
0068 
0069         if not self.sync:
0070             # not in sync, can't decode
0071             return ""
0072         
0073         text = self.zstream.decompress(bytes)
0074         #print "decoded:", len(text), len(self.zstream.unused_data)
0075         if len(self.zstream.unused_data) == 8:
0076             # this usually means checksum and len is left
0077             # but we don't care about any of those!
0078             self.zstream.flush()
0079             self.zstream = None
0080 
0081         return text
0082 
0083     def output_line(self, line):
0084         sys.stdout.write(line)
0085         sys.stdout.flush()
0086 
0087     def initial_synchronize(self):
0088         f = self.f
0089 
0090         f.seek(0, 2)
0091         end = f.tell()
0092         start = max(0, end - DECOMPRESS_BUF_SIZE)
0093         f.seek(start, 0)
0094 
0095         body = f.read(end - start)
0096         text = self.decode(body, start == 0)
0097         
0098         self.known_size = end
0099         return text
0100 
0101     def initial(self):
0102         text = self.initial_synchronize()
0103 
0104         n_lines = self.last_n_lines
0105         lines = text.rsplit(ENDLINE_MAGIC, n_lines + 1)
0106         if len(lines) > n_lines:
0107             lines = lines[1:]
0108 
0109         self.output_line(ENDLINE_MAGIC.join(lines))
0110 
0111     def follow(self):
0112         if self.known_size is None:
0113             raise Exception("Call initial() first.")
0114 
0115         while self.zstream:
0116             size = os.fstat(self.f.fileno()).st_size
0117 
0118             if self.known_size > size:
0119                 sys.stderr.write("%s: file truncated\n" % sys.argv[0])
0120                 sys.stderr.write("%s: waiting for the next write\n" % sys.argv[0])
0121                 sys.stderr.flush()
0122 
0123                 if self.sync:
0124                     self.sync = False
0125                     self.zstream.flush()
0126                     self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
0127 
0128                 text = self.initial_synchronize()
0129                 continue
0130             elif self.known_size == size:
0131                 time.sleep(1)
0132                 continue
0133 
0134             assert self.f.tell() == self.known_size
0135             body = self.f.read(size - self.known_size)
0136         
0137             text = self.decode(body, self.known_size == 0)
0138             self.output_line(text)
0139 
0140             self.known_size = size
0141 
0142 if __name__ == "__main__":
0143     import argparse
0144     parser = argparse.ArgumentParser(description='tail, but for gzip files (with Z_FULL_SYNC)')
0145     parser.add_argument('-f', action="store_true", help='watch the file for changes')
0146     parser.add_argument('-n', type=int, help='output the last K lines', metavar='K', default=10)
0147     parser.add_argument('file', help="file name to watch")
0148 
0149     args = parser.parse_args()
0150 
0151     d = Decoder(args.file, args.n)
0152     d.initial()
0153 
0154     if args.f:
0155         d.follow()