Mercurial > hg > PaInt
view paint/package.py @ 73:5b10f2c03cb9
more stubbing of something really stupid that is really hard
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Sun, 27 Jan 2013 18:19:14 -0800 |
parents | 017b75cd61d8 |
children | ab0620d3755b |
line wrap: on
line source
""" package model for python PAckage INTrospection """ # TODO: use pkginfo.sdist more import info import os import pip import shutil import subprocess import sys import tarfile import tempfile import urllib2 import urlparse import utils from subprocess import check_call as call __all__ = ['Package'] class Package(object): """ class for python package introspection. constructor takes the package 'src' """ def __init__(self, src, verbose=True): """ - src : URL or filesystem path to the package """ self.src = src self.verbose = verbose # ephemeral data self._tmppath = None self._egg_info_path = None self._build_path = None self._pkg_info_path = None # package metadata provider # TODO: this should be chooseable but currently the keys of the # interfaces are not the same self.package_info = info.EggInfo # TODO: list of temporary files/directories to be deleted def _log(self, message): if self.verbose: print '>>> %s' % message def _path(self): """filesystem path to package directory""" self._log(">>> _path:Getting path to package") # return cached copy if it exists if self._tmppath: return self._tmppath # fetch from the web if a URL tmpfile = None src = self.src if utils.isURL(self.src): tmpfile = src = self.fetch() # unpack if an archive if self._is_archive(src): try: self._unpack(src) finally: if tmpfile: os.remove(tmpfile) return self._tmppath return self.src def fetch(self, filename=None): """fetch from remote source to a temporary file""" if filename is None: fd, filename = tempfile.mkstemp() os.close(fd) fp = file(filename, 'w') resource = urllib2.urlopen(self.src) fp.write(resource.read()) fp.close() return filename def _unpack(self, archive): """unpack the archive to a temporary destination""" # TODO: should handle zipfile additionally at least # Ideally, this would be pluggable, etc assert tarfile.is_tarfile(archive), "%s is not an archive" % self.src tf = tarfile.TarFile.open(archive) self._tmppath = tempfile.mkdtemp() members = tf.getmembers() # cut off the top level directory members = [i for i in members if os.path.sep in i.name] tld = set() for member in members: directory, member.name = member.name.split(os.path.sep, 1) tld.add(directory) assert len(tld) == 1 # extract for member in members: tf.extract(member, path=self._tmppath) tf.close() def _is_archive(self, path): """returns if the filesystem path is an archive""" # TODO: should handle zipfile additionally at least # Ideally, this would be pluggable, etc return tarfile.is_tarfile(path) def _cleanup(self): if self._tmppath: shutil.rmtree(self._tmppath) self._tmppath = None __del__ = _cleanup ### python-package-specific functionality def info(self): """return info dictionary for package""" self._log(">>> Getting the info") return self.package_info(self._path())() def dependencies(self): return self.package_info(self._path()).dependencies() def extension(self): """filename extension of the package""" print ">>> extension:Getting package" package = self.package() print ">>> extension:package=%s" % package # determine the extension (XXX hacky) extensions = ('.tar.gz', '.zip', '.tar.bz2') for ext in extensions: if package.endswith(ext): return ext raise Exception("Extension %s not found: %s" % (extensions, package)) def package(self, destination=None): """ repackage the package to ensure its actually in the right form and return the path to the destination - destination: if given, path to put the build in """ self._log("package: Getting package directory, destination=%s" % repr(destination)) if self._build_path: self._log("package: build_path already set: %s" % self._build_path) if destination: shutil.copy(self._build_path, destination) return os.path.abspath(destination) # return cached copy return self._build_path path = self._path() dist = os.path.join(path, 'dist') self._log("package: dist directory: %s; (path=%s)" % (dist, path)) if os.path.exists(dist): shutil.rmtree(dist) command = [sys.executable, 'setup.py', 'sdist'] self._log("package: running: %s" % ' '.join(command)) call(command, cwd=path) self._log("package: done running setup.py dist") assert os.path.exists(dist) contents = os.listdir(dist) assert len(contents) == 1 self._build_path = os.path.join(dist, contents[0]) # destination # use an evil recursive trick if destination: return self.package(destination=destination) return self._build_path def download(self, directory): """download a package and all its dependencies using pip""" if not os.path.exists(directory): os.makedirs(directory) assert os.path.isdir(directory) pip.main(['install', '--download', directory, self.src]) def pypi(self, directory): """ download packages for a pypi directory structure http://k0s.org/portfolio/pypi.html """ if not os.path.exists(directory): os.makedirs(directory) assert os.path.isdir(directory) tempdir = tempfile.mkdtemp() try: self.download(tempdir) files = os.listdir(tempdir) self._log(">>> Files: %s" % files) for f in files: # full path src = os.path.join(tempdir, f) # make a package of the thing print ">>> pypi:Packaging %s" % src package = Package(src) print ">>> pypi:DONE packaging %s" % src # get destination dirname, filename print ">>> pypi:Getting PyPI path" dirname, filename = package.pypi_path() print ">>> pypi:DONE PyPI path: %s/%s" % (dirname, filename) # make the directory if it doesn't exist subdir = os.path.join(directory, dirname) if not os.path.exists(subdir): os.makedirs(subdir) assert os.path.isdir(subdir) # move the file print ">>> pypi:Moving to PyPI path %s/%s" % (subdir, filename) package.package(destination=os.path.join(subdir, filename)) print ">>> Done with %s" % src finally: shutil.rmtree(tempdir) def pypi_path(self): """ returns subpath 2-tuple appropriate for pypi path structure: http://k0s.org/portfolio/pypi.html """ self._log(">>> pypi_path:Getting info") info = self.info() self._log(">>> pypi_path:DONE getting info") # determine the extension self._log(">>> pypi_path:Getting extension") extension = self.extension() self._log(">>> pypi_path:DONE Getting extension: %s" % extension) # get the filename destination name = info['Name'] version = info['Version'] filename = '%s-%s%s' % (name, version, extension) return name, filename class PackageSet(object): """ a group of packages """ def __init__(self, *packages): self.packages = [] for package in packages: self.add(package) raise NotImplementedError def add(self, package): """ add a package to the instance. If Package instance, add that, otherwise convert """ __iadd__ = add def dependencies(self): """return dependecies for each package""" def unroll_dependencies(self): """unroll dependencies for a package set"""