view paint/package.py @ 29:59524b6d25c0

implementing making pypi directory strcuture from a package
author Jeff Hammel <jhammel@mozilla.com>
date Fri, 30 Mar 2012 11:41:04 -0700
parents c44fb4b6918b
children fe5f282dca9b
line wrap: on
line source

"""
package model for python PAckage INTrospection
"""

import os
import pip
import pypi
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib2
import urlparse
import utils

try:
    from subprocess import check_call as call
except ImportError:
    from subporcess import call

__all__ = ['Package']

class Package(object):

    def __init__(self, src):
        self.src = src

        # ephemeral data
        self._tmppath = None
        self._egg_info_path = None

    def path(self):
        """filesystem path to package"""

        # return cached copy if it exists
        if self._tmppath:
            return self._tmppath

        # fetch from the web if a URL
        tmpfile = None
        src = self.src
        if utils.isURL(self.src):
            tmpfile = src = self.fetch()

        # unpack if an archive
        if self._is_archive(src):
            try:
                self.unpack(src)
            finally:
                if tmpfile:
                    os.remove(tmpfile)
            return self._tmppath

        return self.src

    def fetch(self):
        """fetch from remote source to a temporary file"""
        resource = urllib2.urlopen(self.src)
        fd, filename = tempfile.mkstemp()
        os.write(fd, resource.read())
        os.close(fd)
        return filename

    def unpack(self, archive):
        """unpack the archive to a temporary destination"""
        # TODO: should handle zipfile additionally at least
        # Ideally, this would be pluggable, etc
        assert tarfile.is_tarfile(archive), "%s is not an archive" % self.src
        tf = tarfile.TarFile.open(archive)
        self._tmppath = tempfile.mkdtemp()
        members = tf.getmembers()

        # cut off the top level directory
        assert not [i for i in members if not os.path.sep in i.name]
        tld = set()
        for member in members:
            directory, member.name = member.name.split(os.path.sep, 1)
            tld.add(directory)
        assert len(tld) == 1

        # extract
        for member in members:
            tf.extract(member, path=self._tmppath)
        tf.close()

    def _is_archive(self, path):
        """returns if the filesystem path is an archive"""
        # TODO: should handle zipfile additionally at least
        # Ideally, this would be pluggable, etc
        return tarfile.is_tarfile(path)

    def _cleanup(self):
        if self._tmppath:
            shutil.rmtree(self._tmppath)
        self._tmppath = None

#    __del__ = cleanup

    ### python-package-specific functionality

    def _egg_info(self):
        """build the egg_info directory"""

        if self._egg_info_path:
            # return cached copy
            return self._egg_info_path

        directory = self.path()
        setup_py = os.path.join(directory, 'setup.py')
        if not os.path.exists(setup_py):
            raise AssertionError("%s does not exist" % setup_py)

        # setup the egg info
        call([sys.executable, 'setup.py', 'egg_info'], cwd=directory, stdout=subprocess.PIPE)

        # get the .egg-info directory
        egg_info = [i for i in os.listdir(directory)
                    if i.endswith('.egg-info')]
        assert len(egg_info) == 1, 'Expected one .egg-info directory in %s, got: %s' % (directory, egg_info)
        egg_info = os.path.join(directory, egg_info[0])
        assert os.path.isdir(egg_info), "%s is not a directory" % egg_info

        # cache it
        self._egg_info_path = egg_info
        return self._egg_info_path

    def info(self):
        """return info dictionary for package"""
        # could use pkginfo

        egg_info = self._egg_info()

        # read the package information
        pkg_info = os.path.join(egg_info, 'PKG-INFO')
        info_dict = {}
        for line in file(pkg_info).readlines():
            if not line or line[0].isspace():
                continue # XXX neglects description
            assert ':' in line
            key, value = [i.strip() for i in line.split(':', 1)]
            info_dict[key] = value

        # return the information
        return info_dict

    def dependencies(self):
        """return the dependencies"""

        # get the egg_info directory
        egg_info = self._egg_info()

        # read the dependencies
        requires = os.path.join(egg_info, 'requires.txt')
        if os.path.exists(requires):
            dependencies = [i.strip() for i in file(requires).readlines() if i.strip()]
        else:
            dependencies = []
        dependencies = dict([(i, None) for i in dependencies])

        # read the dependency links
        dependency_links = os.path.join(egg_info, 'dependency_links.txt')
        if os.path.exists(dependency_links):
            links = [i.strip() for i in file(dependency_links).readlines() if i.strip()]
            for link in links:
                # XXX pretty ghetto
                assert '#egg=' in link
                url, dep = link.split('#egg=', 1)
                if dep in dependencies:
                    dependencies[dep] = link

        return dependencies

    def download(self, directory):
        """download a package and all its dependencies using pip"""
        if not os.path.exists(directory):
            os.makedirs(directory)
        assert os.path.isdir(directory)
        pip.main(['install', '--download', directory, self.src])

    def pypi(self, directory):
        """
        download packages for a pypi directory structure
        http://k0s.org/portfolio/pypi.html
        """
        if not os.path.exists(directory):
            os.makedirs(directory)
        assert os.path.isdir(directory)
        tempdir = tempfile.mkdtemp()
        try:
            self.download(tempdir)
            for package in os.listdir(tempdir):

                # full path
                src = os.path.join(tempdir, package)

                # get destination dirname, filename
                dirname, filename = pypi.pypi_path(src)

                # make the directory if it doesn't exist
                subdir = os.path.join(directory, dirname)
                if not os.path.exists(subdir):
                    os.path.makedirs(subdir)
                assert os.path.isdir(subdir)

                # move the file
                shutil.move(src, os.path.join(subdir, filename))
        finally:
            shutil.rmtree(tempdir)