comparison files.py @ 50:4ea7133ac25c

Converted 2.00 into the default branch
author Oleg Oshmyan <chortos@inbox.lv>
date Sun, 19 Dec 2010 23:25:13 +0200
parents 2.00/files.py@8fec38b0dd6e
children 1914ae9cfdce
comparison
equal deleted inserted replaced
47:06f1683c8db9 50:4ea7133ac25c
1 #! /usr/bin/env python
2 # Copyright (c) 2010 Chortos-2 <chortos@inbox.lv>
3
4 """File access routines and classes with support for archives."""
5
6 from __future__ import division, with_statement
7
8 try:
9 from compat import *
10 except ImportError:
11 import __main__
12 __main__.import_error(sys.exc_info()[1])
13
14 import contextlib, os, shutil, sys
15
16 # You don't need to know about anything else.
17 __all__ = 'File',
18
19 # In these two variables, use full stops no matter what os.extsep is;
20 # all full stops will be converted to os.extsep on the fly
21 archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2'
22 formats = {}
23
24 class Archive(object):
25 __slots__ = 'file'
26
27 if ABCMeta:
28 __metaclass__ = ABCMeta
29
30 def __new__(cls, path):
31 """
32 Create a new instance of the archive class corresponding
33 to the file name in the given path.
34 """
35 if cls is not Archive:
36 return object.__new__(cls)
37 else:
38 # Do this by hand rather than through os.path.splitext
39 # because we support multi-dotted file name extensions
40 ext = path.partition(os.path.extsep)[2]
41 while ext:
42 if ext in formats:
43 return formats[ext](path)
44 ext = ext.partition(os.path.extsep)[2]
45 raise LookupError("unsupported archive file name extension in file name '%s'" % filename)
46
47 @abstractmethod
48 def __init__(self, path): raise NotImplementedError
49
50 @abstractmethod
51 def extract(self, name, target): raise NotImplementedError
52
53 def __del__(self):
54 del self.file
55
56 try:
57 import tarfile
58 except ImportError:
59 TarArchive = None
60 else:
61 class TarArchive(Archive):
62 __slots__ = '__namelist'
63
64 def __init__(self, path):
65 self.file = tarfile.open(path)
66
67 def extract(self, name, target):
68 member = self.file.getmember(name)
69 member.name = target
70 self.file.extract(member)
71
72 # TODO: somehow automagically emulate universal line break support
73 def open(self, name):
74 return self.file.extractfile(name)
75
76 def exists(self, queried_name):
77 if not hasattr(self, '__namelist'):
78 names = set()
79 for name in self.file.getnames():
80 cutname = name
81 while cutname:
82 names.add(cutname)
83 cutname = cutname.rpartition('/')[0]
84 self.__namelist = frozenset(names)
85 return queried_name in self.__namelist
86
87 def __enter__(self):
88 if hasattr(self.file, '__enter__'):
89 self.file.__enter__()
90 return self
91
92 def __exit__(self, exc_type, exc_value, traceback):
93 if hasattr(self.file, '__exit__'):
94 return self.file.__exit__(exc_type, exc_value, traceback)
95 elif exc_type is None:
96 self.file.close()
97 else:
98 # This code was shamelessly copied from tarfile.py of Python 2.7
99 if not self.file._extfileobj:
100 self.file.fileobj.close()
101 self.file.closed = True
102
103 formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive
104
105 try:
106 import zipfile
107 except ImportError:
108 ZipArchive = None
109 else:
110 class ZipArchive(Archive):
111 __slots__ = '__namelist'
112
113 def __init__(self, path):
114 self.file = zipfile.ZipFile(path)
115
116 def extract(self, name, target):
117 if os.path.isabs(target):
118 # To my knowledge, this is as portable as it gets
119 path = os.path.join(os.path.splitdrive(target)[0], os.path.sep)
120 else:
121 path = None
122
123 member = self.file.getinfo(name)
124 member.filename = os.path.relpath(target, path)
125 # FIXME: 2.5 lacks ZipFile.extract
126 self.file.extract(member, path)
127
128 def open(self, name):
129 return self.file.open(name, 'rU')
130
131 def exists(self, queried_name):
132 if not hasattr(self, '__namelist'):
133 names = set()
134 for name in self.file.namelist():
135 cutname = name
136 while cutname:
137 names.add(cutname)
138 cutname = cutname.rpartition('/')[0]
139 self.__namelist = frozenset(names)
140 return queried_name in self.__namelist
141
142 def __enter__(self):
143 if hasattr(self.file, '__enter__'):
144 self.file.__enter__()
145 return self
146
147 def __exit__(self, exc_type, exc_value, traceback):
148 if hasattr(self.file, '__exit__'):
149 return self.file.__exit__(exc_type, exc_value, traceback)
150 else:
151 return self.file.close()
152
153 formats['zip'] = ZipArchive
154
155 # Remove unsupported archive formats and replace full stops
156 # with the platform-dependent file name extension separator
157 def issupported(filename, formats=formats):
158 ext = filename.partition('.')[2]
159 while ext:
160 if ext in formats: return True
161 ext = ext.partition('.')[2]
162 return False
163 archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)]
164 formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats))
165
166 open_archives = {}
167
168 def open_archive(path):
169 if path in open_archives:
170 return open_archives[path]
171 else:
172 open_archives[path] = archive = Archive(path)
173 return archive
174
175 class File(object):
176 __slots__ = 'virtual_path', 'real_path', 'full_real_path', 'archive'
177
178 def __init__(self, virtpath, allow_root=False, msg='test data'):
179 self.virtual_path = virtpath
180 self.archive = None
181 if not self.realize_path('', tuple(comp.replace('.', os.path.extsep) for comp in virtpath.split('/')), allow_root):
182 raise IOError("%s file '%s' could not be found" % (msg, virtpath))
183
184 def realize_path(self, root, virtpath, allow_root=False, hastests=False):
185 if root and not os.path.exists(root):
186 return False
187 if len(virtpath) > 1:
188 if self.realize_path(os.path.join(root, virtpath[0]), virtpath[1:], allow_root, hastests):
189 return True
190 elif not hastests:
191 if self.realize_path(os.path.join(root, 'tests'), virtpath, allow_root, True):
192 return True
193 for archive in archives:
194 path = os.path.join(root, archive)
195 if os.path.exists(path):
196 if self.realize_path_archive(open_archive(path), '', virtpath, path):
197 return True
198 if self.realize_path(root, virtpath[1:], allow_root, hastests):
199 return True
200 else:
201 if not hastests:
202 path = os.path.join(root, 'tests', virtpath[0])
203 if os.path.exists(path):
204 self.full_real_path = self.real_path = path
205 return True
206 for archive in archives:
207 path = os.path.join(root, archive)
208 if os.path.exists(path):
209 if self.realize_path_archive(open_archive(path), '', virtpath, path):
210 return True
211 if hastests or allow_root:
212 path = os.path.join(root, virtpath[0])
213 if os.path.exists(path):
214 self.full_real_path = self.real_path = path
215 return True
216 return False
217
218 def realize_path_archive(self, archive, root, virtpath, archpath):
219 if root and not archive.exists(root):
220 return False
221 if root: path = ''.join((root, '/', virtpath[0]))
222 else: path = virtpath[0]
223 if len(virtpath) > 1:
224 if self.realize_path_archive(archive, path, virtpath[1:], archpath):
225 return True
226 elif self.realize_path_archive(archive, root, virtpath[1:], archpath):
227 return True
228 else:
229 if archive.exists(path):
230 self.archive = archive
231 self.real_path = path
232 self.full_real_path = os.path.join(archpath, *path.split('/'))
233 return True
234 return False
235
236 def open(self):
237 if self.archive:
238 file = self.archive.open(self.real_path)
239 if hasattr(file, '__exit__'):
240 return file
241 else:
242 return contextlib.closing(file)
243 else:
244 return open(self.real_path)
245
246 def copy(self, target):
247 if self.archive:
248 self.archive.extract(self.real_path, target)
249 else:
250 shutil.copy(self.real_path, target)