view python/url.py @ 924:fc88a12acf48 default tip

add lfs filter
author Jeff Hammel <k0scist@gmail.com>
date Wed, 04 Dec 2024 15:50:53 -0800
parents f011ec45b8e8
children
line wrap: on
line source

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
url manipulation
"""

import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import urlparse
import urllib2

__all__ = ['load', 'main']
string = (str, unicode)

def ensure_dir(directory):
    """ensure `directory` is a directory"""
    if os.path.exists(directory):
        assert os.path.isdir(directory)
        return directory
    os.makedirs(directory)
    return directory

def isURL(url):
    return '://' in url

def read_s3(url):
    name = tempfile.mktemp()
    try:
        subprocess.check_output(['s3cmd', 'get', url, name])
        with open(name) as f:
            read = f.read()
        os.remove(name)
        return read
    finally:
        if os.path.exists(name):
            os.remove(name)

def read_http(url):
    return urllib2.urlopen(url).read()

def read_file(url):
    scheme = 'file://'
    if url.startswith(scheme):
        url = url[len(scheme):]
    return open(url).read()

loaders = {'s3': read_s3,
           'http': read_http,
           'https': read_http,
           'file': read_file
       }

def scheme(url):
    if '://' in url:
        parsed = urlparse.urlsplit(url)
        return parsed.scheme
    return 'file'

def parent(url):
    if '://' in url:
        return url.rsplit('/', 1)[0]
    else:
        # file
        return os.path.abspath(os.path.dirname(url))

def basename(url):
    if '://' in url:
        return url.rsplit('/', 1)[-1]
    else:
        # file
        return os.path.basename(url)

def loader(url):
    return loaders[scheme(url)]

def load(url):
    """returns the contents of a URL"""
    return loader(url)(url)

def get_file(src, dest):
    shutil.copy2(src, dest)

def get_s3(src, dest):
    subprocess.check_output(['s3cmd', 'get', src, dest])

def default_getter(src, dest):
    assert not os.path.isURL(dest)
    dirname = parent(dest)
    ensure_dir(dirname)
    with open(dest, 'w') as f:
        f.write(load(url))

getters = {'file': get_file,
           's3': get_s3
       }

def get(src, dest):
    """get a thing to a local file"""
    if os.path.isdir(dest):
        dest = os.path.join(dest, basename(src))
    return getters.get(scheme(src), default_getter)(src, dest)

def rel(base, path):
    """
    relative path to base
    otherwise, return None
    """

    if path.startswith(base):
        return path[len(base):]

def main(args=sys.argv[1:]):
    """CLI"""

    # parse command line
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('url', help='URL to read')
    parser.add_argument('-o', '--output', dest='output',
                        help="get to this location")
    options = parser.parse_args(args)

    if options.output:
        # copy src to this location
        get(options.url, options.output)
        sys.exit()

    # read location
    contents = load(options.url)

    # output
    print (contents)

if __name__ == '__main__':
    main()