1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#!/usr/bin/env python3
import time
import os
import sys
import zlib
import struct
DECOMPRESS_BUF_SIZE = 4*1024*1024
BLOCK_MAGIC = "\x00\x00\xFF\xFF"
ENDLINE_MAGIC = "\n"
# we treat everything as a deflate stream
# gzip header has no power here
def strip_gzip_header(body):
assert body[0:2] == "\x1f\x8b"
method, flags, mtime = struct.unpack("<BBIxx", body[2:10])
FHCRC = 0x02
FEXTRA = 0x04
FNAME = 0x08
FCOMMENT = 0x10
i = 10
if flags & FEXTRA:
size, = struct.unpack("<H", body[i:i+2])
i += size + 2
def skip_until_zero(ix):
while body[ix] != '\x00': ix += 1
return ix + 1
if flags & FNAME: i = skip_until_zero(i)
if flags & FCOMMENT: i = skip_until_zero(i)
if flags & FHCRC: i += 2
body = body[i:]
return body
class Decoder(object):
def __init__(self, fname, last_n_lines):
self.f = open(fname, "rb")
self.last_n_lines = last_n_lines
self.reset()
def reset(self):
self.sync = False
if hasattr(self, 'zstream'):
self.zstream.flush()
self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
def decode(self, bytes, if_start=False):
if not bytes:
return ""
if if_start:
self.sync = True
#self.zstream = zlib.decompressobj(zlib.MAX_WBITS | 32)
bytes = strip_gzip_header(bytes)
elif not self.sync:
x = bytes.find(BLOCK_MAGIC)
if x != -1:
bytes = bytes[x + len(BLOCK_MAGIC):]
self.sync = True
if not self.sync:
# not in sync, can't decode
return ""
text = self.zstream.decompress(bytes)
#print "decoded:", len(text), len(self.zstream.unused_data)
if len(self.zstream.unused_data) == 8:
# this usually means checksum and len is left
# but we don't care about any of those!
self.zstream.flush()
self.zstream = None
return text
def output_line(self, line):
sys.stdout.write(line)
sys.stdout.flush()
def initial_synchronize(self):
f = self.f
f.seek(0, 2)
end = f.tell()
start = max(0, end - DECOMPRESS_BUF_SIZE)
f.seek(start, 0)
body = f.read(end - start)
text = self.decode(body, start == 0)
self.known_size = end
return text
def initial(self):
text = self.initial_synchronize()
n_lines = self.last_n_lines
lines = text.rsplit(ENDLINE_MAGIC, n_lines + 1)
if len(lines) > n_lines:
lines = lines[1:]
self.output_line(ENDLINE_MAGIC.join(lines))
def follow(self):
if self.known_size is None:
raise Exception("Call initial() first.")
while self.zstream:
size = os.fstat(self.f.fileno()).st_size
if self.known_size > size:
sys.stderr.write("%s: file truncated\n" % sys.argv[0])
sys.stderr.write("%s: waiting for the next write\n" % sys.argv[0])
sys.stderr.flush()
if self.sync:
self.sync = False
self.zstream.flush()
self.zstream = zlib.decompressobj(-zlib.MAX_WBITS)
text = self.initial_synchronize()
continue
elif self.known_size == size:
time.sleep(1)
continue
assert self.f.tell() == self.known_size
body = self.f.read(size - self.known_size)
text = self.decode(body, self.known_size == 0)
self.output_line(text)
self.known_size = size
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='tail, but for gzip files (with Z_FULL_SYNC)')
parser.add_argument('-f', action="store_true", help='watch the file for changes')
parser.add_argument('-n', type=int, help='output the last K lines', metavar='K', default=10)
parser.add_argument('file', help="file name to watch")
args = parser.parse_args()
d = Decoder(args.file, args.n)
d.initial()
if args.f:
d.follow()
|