File indexing completed on 2024-04-06 12:10:15
0001
0002
0003 import time
0004 import os
0005 import sys
0006 import zlib
0007 import struct
0008
0009 DECOMPRESS_BUF_SIZE = 4*1024*1024
0010
0011 BLOCK_MAGIC = "\x00\x00\xFF\xFF"
0012 ENDLINE_MAGIC = "\n"
0013
0014
0015
0016 def strip_gzip_header(body):
0017 assert body[0:2] == "\x1f\x8b"
0018 method, flags, mtime = struct.unpack("<BBIxx", body[2:10])
0019
0020 FHCRC = 0x02
0021 FEXTRA = 0x04
0022 FNAME = 0x08
0023 FCOMMENT = 0x10
0024
0025 i = 10
0026
0027 if flags & FEXTRA:
0028 size, = struct.unpack("<H", body[i:i+2])
0029 i += size + 2
0030
0031 def skip_until_zero(ix):
0032 while body[ix] != '\x00': ix += 1
0033 return ix + 1
0034
0035 if flags & FNAME: i = skip_until_zero(i)
0036 if flags & FCOMMENT: i = skip_until_zero(i)
0037 if flags & FHCRC: i += 2
0038
0039 body = body[i:]
0040 return body
0041
0042 class Decoder(object):
0043 def __init__(self, fname, last_n_lines):
0044 self.f = open(fname, "rb")
0045 self.last_n_lines = last_n_lines
0046 self.reset()
0047
0048 def reset(self):
0049 self.sync = False
0050 if hasattr(self, 'zstream'):
0051 self.zstream.flush()
0052
0053 self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
0054
0055 def decode(self, bytes, if_start=False):
0056 if not bytes:
0057 return ""
0058
0059 if if_start:
0060 self.sync = True
0061
0062 bytes = strip_gzip_header(bytes)
0063 elif not self.sync:
0064 x = bytes.find(BLOCK_MAGIC)
0065 if x != -1:
0066 bytes = bytes[x + len(BLOCK_MAGIC):]
0067 self.sync = True
0068
0069 if not self.sync:
0070
0071 return ""
0072
0073 text = self.zstream.decompress(bytes)
0074
0075 if len(self.zstream.unused_data) == 8:
0076
0077
0078 self.zstream.flush()
0079 self.zstream = None
0080
0081 return text
0082
0083 def output_line(self, line):
0084 sys.stdout.write(line)
0085 sys.stdout.flush()
0086
0087 def initial_synchronize(self):
0088 f = self.f
0089
0090 f.seek(0, 2)
0091 end = f.tell()
0092 start = max(0, end - DECOMPRESS_BUF_SIZE)
0093 f.seek(start, 0)
0094
0095 body = f.read(end - start)
0096 text = self.decode(body, start == 0)
0097
0098 self.known_size = end
0099 return text
0100
0101 def initial(self):
0102 text = self.initial_synchronize()
0103
0104 n_lines = self.last_n_lines
0105 lines = text.rsplit(ENDLINE_MAGIC, n_lines + 1)
0106 if len(lines) > n_lines:
0107 lines = lines[1:]
0108
0109 self.output_line(ENDLINE_MAGIC.join(lines))
0110
0111 def follow(self):
0112 if self.known_size is None:
0113 raise Exception("Call initial() first.")
0114
0115 while self.zstream:
0116 size = os.fstat(self.f.fileno()).st_size
0117
0118 if self.known_size > size:
0119 sys.stderr.write("%s: file truncated\n" % sys.argv[0])
0120 sys.stderr.write("%s: waiting for the next write\n" % sys.argv[0])
0121 sys.stderr.flush()
0122
0123 if self.sync:
0124 self.sync = False
0125 self.zstream.flush()
0126 self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
0127
0128 text = self.initial_synchronize()
0129 continue
0130 elif self.known_size == size:
0131 time.sleep(1)
0132 continue
0133
0134 assert self.f.tell() == self.known_size
0135 body = self.f.read(size - self.known_size)
0136
0137 text = self.decode(body, self.known_size == 0)
0138 self.output_line(text)
0139
0140 self.known_size = size
0141
0142 if __name__ == "__main__":
0143 import argparse
0144 parser = argparse.ArgumentParser(description='tail, but for gzip files (with Z_FULL_SYNC)')
0145 parser.add_argument('-f', action="store_true", help='watch the file for changes')
0146 parser.add_argument('-n', type=int, help='output the last K lines', metavar='K', default=10)
0147 parser.add_argument('file', help="file name to watch")
0148
0149 args = parser.parse_args()
0150
0151 d = Decoder(args.file, args.n)
0152 d.initial()
0153
0154 if args.f:
0155 d.follow()