Mercurial > ~astiob > upreckon > hgweb
comparison 2.00/files.py @ 21:ec6f1a132109
A pretty usable version
Test groups and testconfs in non-ZIP archives or ZIP archives with comments are not yet supported.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Fri, 06 Aug 2010 15:39:29 +0000 |
parents | f2279b7602d3 |
children | c1f52b5d80d6 |
comparison
equal
deleted
inserted
replaced
20:5bfa23cd638d | 21:ec6f1a132109 |
---|---|
1 #!/usr/bin/python | 1 #! /usr/bin/env python |
2 # Copyright (c) 2010 Chortos-2 <chortos@inbox.lv> | 2 # Copyright (c) 2010 Chortos-2 <chortos@inbox.lv> |
3 | 3 |
4 import os | 4 """File access routines and classes with support for archives.""" |
5 tasknames = (os.path.curdir,) | 5 |
6 | 6 from __future__ import division, with_statement |
7 class Files(object): | 7 |
8 __slots__ = 'name', 'paths' | 8 try: |
9 stdpaths = '%/', '%/^:%/', '%/^:', 'tests/%/', 'tests/', '^:%/', '^:', '' | 9 from compat import * |
10 | 10 except ImportError: |
11 def __init__(self, name, paths = stdpaths): | 11 import __main__ |
12 self.name = name | 12 __main__.import_error(sys.exc_info()[1]) |
13 self.paths = paths | 13 |
14 | 14 import contextlib, os, shutil, sys |
15 def __iter__(self): | 15 |
16 for path in paths: | 16 # You don't need to know about anything else. |
17 p = getpath(path, self.name) | 17 __all__ = 'File', |
18 if isfile(p): | 18 |
19 yield p | 19 # In these two variables, use full stops no matter what os.extsep is; |
20 | 20 # all full stops will be converted to os.extsep on the fly |
21 def isfile(path): | 21 archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2' |
22 return os.path.isfile(path) | 22 formats = {} |
23 | 23 |
24 def getpath(path, name): | 24 class Archive(object): |
25 return path + name | 25 __slots__ = 'file' |
26 | |
27 if ABCMeta: | |
28 __metaclass__ = ABCMeta | |
29 | |
30 def __new__(cls, path): | |
31 """ | |
32 Create a new instance of the archive class corresponding | |
33 to the file name in the given path. | |
34 """ | |
35 if cls is not Archive: | |
36 return object.__new__(cls) | |
37 else: | |
38 # Do this by hand rather than through os.path.splitext | |
39 # because we support multi-dotted file name extensions | |
40 ext = path.partition(os.path.extsep)[2] | |
41 while ext: | |
42 if ext in formats: | |
43 return formats[ext](path) | |
44 ext = ext.partition(os.path.extsep)[2] | |
45 raise LookupError("unsupported archive file name extension in file name '%s'" % filename) | |
46 | |
47 @abstractmethod | |
48 def __init__(self, path): raise NotImplementedError | |
49 | |
50 @abstractmethod | |
51 def extract(self, name, target): raise NotImplementedError | |
52 | |
53 def __del__(self): | |
54 del self.file | |
55 | |
56 try: | |
57 import tarfile | |
58 | |
59 class TarArchive(Archive): | |
60 __slots__ = '__namelist' | |
61 | |
62 def __init__(self, path): | |
63 self.file = tarfile.open(path) | |
64 | |
65 def extract(self, name, target): | |
66 member = self.file.getmember(name) | |
67 member.name = target | |
68 self.file.extract(member) | |
69 | |
70 # TODO: somehow automagically emulate universal line break support | |
71 def open(self, name): | |
72 return self.file.extractfile(name) | |
73 | |
74 def exists(self, queried_name): | |
75 if not hasattr(self, '__namelist'): | |
76 names = set() | |
77 for name in self.file.getnames(): | |
78 cutname = name | |
79 while cutname: | |
80 names.add(cutname) | |
81 cutname = cutname.rpartition('/')[0] | |
82 self.__namelist = frozenset(names) | |
83 return queried_name in self.__namelist | |
84 | |
85 def __enter__(self): | |
86 if hasattr(self.file, '__enter__'): | |
87 self.file.__enter__() | |
88 return self | |
89 | |
90 def __exit__(self, exc_type, exc_value, traceback): | |
91 if hasattr(self.file, '__exit__'): | |
92 return self.file.__exit__(exc_type, exc_value, traceback) | |
93 elif exc_type is None: | |
94 self.file.close() | |
95 else: | |
96 # This code was shamelessly copied from tarfile.py of Python 2.7 | |
97 if not self.file._extfileobj: | |
98 self.file.fileobj.close() | |
99 self.file.closed = True | |
100 | |
101 formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive | |
102 except ImportError: | |
103 TarArchive = None | |
104 | |
105 try: | |
106 import zipfile | |
107 | |
108 class ZipArchive(Archive): | |
109 __slots__ = '__namelist' | |
110 | |
111 def __init__(self, path): | |
112 self.file = zipfile.ZipFile(path) | |
113 | |
114 def extract(self, name, target): | |
115 if os.path.isabs(target): | |
116 # To my knowledge, this is as portable as it gets | |
117 path = os.path.join(os.path.splitdrive(target)[0], os.path.sep) | |
118 else: | |
119 path = None | |
120 | |
121 member = self.file.getinfo(name) | |
122 # FIXME: 2.5 lacks os.path.realpath | |
123 member.filename = os.path.relpath(target, path) | |
124 # FIXME: 2.5 lacks ZipFile.extract | |
125 self.file.extract(member, path) | |
126 | |
127 def open(self, name): | |
128 return self.file.open(name, 'rU') | |
129 | |
130 def exists(self, queried_name): | |
131 if not hasattr(self, '__namelist'): | |
132 names = set() | |
133 for name in self.file.namelist(): | |
134 cutname = name | |
135 while cutname: | |
136 names.add(cutname) | |
137 cutname = cutname.rpartition('/')[0] | |
138 self.__namelist = frozenset(names) | |
139 return queried_name in self.__namelist | |
140 | |
141 def __enter__(self): | |
142 if hasattr(self.file, '__enter__'): | |
143 self.file.__enter__() | |
144 return self | |
145 | |
146 def __exit__(self, exc_type, exc_value, traceback): | |
147 if hasattr(self.file, '__exit__'): | |
148 return self.file.__exit__(exc_type, exc_value, traceback) | |
149 else: | |
150 return self.file.close() | |
151 | |
152 formats['zip'] = ZipArchive | |
153 except ImportError: | |
154 ZipArchive = None | |
155 | |
156 # Remove unsupported archive formats and replace full stops | |
157 # with the platform-dependent file name extension separator | |
158 def issupported(filename, formats=formats): | |
159 ext = filename.partition('.')[2] | |
160 while ext: | |
161 if ext in formats: return True | |
162 ext = ext.partition('.')[2] | |
163 return False | |
164 archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)] | |
165 formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats)) | |
166 | |
167 open_archives = {} | |
168 | |
169 def open_archive(path): | |
170 if path in open_archives: | |
171 return open_archives[path] | |
172 else: | |
173 open_archives[path] = archive = Archive(path) | |
174 return archive | |
175 | |
176 class File(object): | |
177 __slots__ = 'virtual_path', 'real_path', 'full_real_path', 'archive' | |
178 | |
179 def __init__(self, virtpath, allow_root=False, msg='test data'): | |
180 self.virtual_path = virtpath | |
181 self.archive = None | |
182 if not self.realize_path('', tuple(comp.replace('.', os.path.extsep) for comp in virtpath.split('/')), allow_root): | |
183 raise IOError("%s file '%s' could not be found" % (msg, virtpath)) | |
184 | |
185 def realize_path(self, root, virtpath, allow_root=False, hastests=False): | |
186 if root and not os.path.exists(root): | |
187 return False | |
188 if len(virtpath) > 1: | |
189 if self.realize_path(os.path.join(root, virtpath[0]), virtpath[1:], allow_root, hastests): | |
190 return True | |
191 elif not hastests: | |
192 if self.realize_path(os.path.join(root, 'tests'), virtpath, allow_root, True): | |
193 return True | |
194 for archive in archives: | |
195 path = os.path.join(root, archive) | |
196 if os.path.exists(path): | |
197 if self.realize_path_archive(open_archive(path), '', virtpath, path): | |
198 return True | |
199 elif self.realize_path(root, virtpath[1:], allow_root, hastests): | |
200 return True | |
201 else: | |
202 if not hastests: | |
203 path = os.path.join(root, 'tests', virtpath[0]) | |
204 if os.path.exists(path): | |
205 self.full_real_path = self.real_path = path | |
206 return True | |
207 for archive in archives: | |
208 path = os.path.join(root, archive) | |
209 if os.path.exists(path): | |
210 if self.realize_path_archive(open_archive(path), '', virtpath, path): | |
211 return True | |
212 if hastests or allow_root: | |
213 path = os.path.join(root, virtpath[0]) | |
214 if os.path.exists(path): | |
215 self.full_real_path = self.real_path = path | |
216 return True | |
217 return False | |
218 | |
219 def realize_path_archive(self, archive, root, virtpath, archpath): | |
220 if root and not archive.exists(root): | |
221 return False | |
222 if root: path = ''.join((root, '/', virtpath[0])) | |
223 else: path = virtpath[0] | |
224 if len(virtpath) > 1: | |
225 if self.realize_path_archive(archive, path, virtpath[1:], archpath): | |
226 return True | |
227 elif self.realize_path_archive(archive, root, virtpath[1:], archpath): | |
228 return True | |
229 else: | |
230 if archive.exists(path): | |
231 self.archive = archive | |
232 self.real_path = path | |
233 self.full_real_path = os.path.join(archpath, *path.split('/')) | |
234 return True | |
235 return False | |
236 | |
237 def open(self): | |
238 if self.archive: | |
239 file = self.archive.open(self.real_path) | |
240 if hasattr(file, '__exit__'): | |
241 return file | |
242 else: | |
243 return contextlib.closing(file) | |
244 else: | |
245 return open(self.real_path) | |
246 | |
247 def copy(self, target): | |
248 if self.archive: | |
249 self.archive.extract(self.real_path, target) | |
250 else: | |
251 shutil.copy(self.real_path, target) |