Mercurial > hg > KCl
view kcl/ec2.py @ 0:0f44ee073173 default tip
fake salt, initial commit
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Mon, 06 Feb 2017 01:10:22 +0000 |
parents | |
children |
line wrap: on
line source
""" code for interfacing with EC2 instances: curl http://169.254.169.254/latest/meta-data/ """ # imports import argparse import boto.utils import hashlib import hmac import json import os import requests import sys import urllib import urlparse import ConfigParser from collections import OrderedDict from datetime import datetime class FilesystemCredentials(object): """ Read credentials from the filesystem. See: - http://boto.cloudhackers.com/en/latest/boto_config_tut.html - https://blogs.aws.amazon.com/security/post/Tx3D6U6WSFGOK2H/A-New-and-Standardized-Way-to-Manage-Credentials-in-the-AWS-SDKs In Unix/Linux systems, on startup, the boto library looks for configuration files in the following locations and in the following order: /etc/boto.cfg - for site-wide settings that all users on this machine will use (if profile is given) ~/.aws/credentials - for credentials shared between SDKs (if profile is given) ~/.boto - for user-specific settings ~/.aws/credentials - for credentials shared between SDKs ~/.boto - for user-specific settings """ def read_aws_credentials(self, fp, section='default'): parser = ConfigParser.RawConfigParser() parser.readfp(fp) if section in parser.sections(): key = 'aws_access_key_id' if parser.has_option(section, key): secret = 'aws_secret_access_key' if parser.has_option(section, secret): return (parser.get(section, key), parser.get(section, secret)) def __init__(self): self.resolution = OrderedDict() home = os.environ['HOME'] if home: self.resolution[os.path.join(home, '.aws', 'credentials')] = self.read_aws_credentials def __call__(self): """ return credentials....*if* available """ for path, method in self.resolution.items(): if os.path.isfile(path): with open(path, 'r') as f: credentials = method(f) if credentials: return credentials class EC2Metadata(object): """EC2 instance metadata interface""" def __init__(self, **kwargs): self._kwargs = kwargs def __call__(self): """http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html""" return boto.utils.get_instance_metadata(**self._kwargs) def security_credentials(self): """ return IAM credentials for an instance, if possible See: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials """ # TODO: nested dict -> object notation mapping ; # note also this is actually a `LazyLoader` value, # not actually a dict return self()['iam']['security-credentials'] def credentials(self, role=None): """return active credentials""" security_credentials = self.security_credentials() if not security_credentials: raise AssertionError("No security credentials available") roles=', '.join(sorted(security_credentials.keys())) if role is None: if len(security_credentials) > 1: raise AssertionError("No role given and multiple roles found for instance: {roles}".format(roles)) role = security_credentials.keys()[0] if role not in security_credentials: raise KeyError("Role {role} not in available IAM roles: {roles}".format(role=role, roles=roles)) return security_credentials[role] class AWSCredentials(FilesystemCredentials): """ try to read credentials from the filesystem then from ec2 metadata """ def __call__(self): # return filesystem crednetials, if any credentials = FilesystemCredentials.__call__(self) if credentials: return credentials # otherwise try to return credentials from metadata metadata = EC2Metadata() try: ec2_credentials = metadata.credentials() except AssertionError: return keys = ('AccessKeyId', 'SecretAccessKey') if set(keys).issubset(ec2_credentials.keys()): return [ec2_credentials[key] for key in keys] class SignedRequest(object): """ Signed request using Signature Version 4 http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html """ # signing information: algorithm = 'AWS4-HMAC-SHA256' termination_string = 'aws4_request' authorization_header = "{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}" # date format: date_format = '%Y%m%d' time_format = '%H%M%S' ### date methods @classmethod def datetime_format(cls): return '{date_format}T{time_format}Z'.format(date_format=cls.date_format, time_format=cls.time_format) @classmethod def datetime(cls, _datetime=None): """ returns formatted datetime string as appropriate for `x-amz-date` header """ if _datetime is None: _datetime = datetime.utcnow() return _datetime.strftime(cls.datetime_format()) ### constructor def __init__(self, access_key, secret_key, region, service): self.access_key = access_key self.secret_key = secret_key self.region = region self.service = service ### hashing methods def hash(self, message): """hash a `message`""" # from e.g. http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html#sig-v4-examples-get-auth-header return hashlib.sha256(message).hexdigest() def sign(self, key, msg): """ See: http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python """ return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() def signature_key(self, date_stamp): parts = [date_stamp, self.region, self.service, self.termination_string] signed = ('AWS4' + self.secret_key).encode('utf-8') while parts: signed = self.sign(signed, parts.pop(0)) return signed ### def credential_scope(self, date_string): """ a string that includes the date, the region you are targeting, the service you are requesting, and a termination string ("aws4_request") in lowercase characters. """ parts = [date_string, self.region, self.service, self.termination_string] # TODO: "The region and service name strings must be UTF-8 encoded." return '/'.join(parts) ### method for canonical components @classmethod def canonical_uri(cls, path): """ The canonical URI is the URI-encoded version of the absolute path component of the URI """ if path == '/': path = None if path: canonical_uri = urllib.quote(path, safe='') else: # If the absolute path is empty, use a forward slash (/) canonical_uri = '/' return canonical_uri @classmethod def canonical_query(cls, query_string): """ returns the canonical query string """ # TODO: currently this does not use `cls` # split into parameter names + values query = urlparse.parse_qs(query_string) # make this into a more appropriate data structure for processing keyvalues = sum([[[key, value] for value in values] for key, values in query.items()], []) # a. URI-encode each parameter name and value def encode(string): return urllib.quote(string, safe='/') encoded = [[encode(string) for string in pair] for pair in keyvalues] # b. Sort the encoded parameter names by character code in ascending order (ASCII order) encoded.sort() # c. Build the canonical query string by starting with the first parameter name in the sorted list. # d. For each parameter, append the URI-encoded parameter name, followed by the character '=' (ASCII code 61), followed by the URI-encoded parameter value. # e. Append the character '&' (ASCII code 38) after each parameter value, except for the last value in the list. retval = '&'.join(['{name}={value}'.format(name=name, value=value) for name, value in encoded]) return retval @classmethod def signed_headers(cls, headers): """ return a list of signed headers """ names = [name.lower() for name in headers.keys()] names.sort() return ';'.join(names) @classmethod def canonical_headers(cls, headers): """ return canonical headers: Construct each header according to the following rules: * Append the lowercase header name followed by a colon. * ... See: - http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html - http://docs.python-requests.org/en/latest/user/quickstart/#custom-headers """ canonical_headers = [] for key, value in headers.items(): # convert header name to lowercase key = key.lower() # trim excess white space from the header values value = value.strip() # convert sequential spaces in the value to a single space. # However, do not remove extra spaces from any values # that are inside quotation marks. quote = '"' if not (value and value[0] == quote and value[-1] == quote): value = ' '.join(value.split()) canonical_headers.append((key, value)) # check for duplicate headers names = [name for name, value in canonical_headers] if len(set(names)) != len(names): raise AssertionError("You have duplicate header names :( While AWS supports this use-case, this library doesn't yet") # Append a comma-separated list of values for that header. # If there are duplicate headers, the values are comma-separated. # Do not sort the values in headers that have multiple values. # Build the canonical headers list by sorting the headers by lowercase character code canonical_headers.sort(key=lambda x: x[0]) # return canonical headers return canonical_headers def __call__(self, url, method='GET', headers=None, session=None): """create a signed request and return the response""" if session: raise NotImplementedError('TODO') else: session = requests.Session() signed_request = self.signed_request(url, method=method, headers=headers) response = session.send(signed_request) return response def canonical_request(self, url, headers, payload='', method='GET'): """ Return canonical request url: "http://k0s.org/home/documents and settings" GET %2Fhome%2Fdocuments%20and%20settings ... """ # parse the url parsed = urlparse.urlsplit(url) # get canonical URI canonical_uri = self.canonical_uri(parsed.path) # construct the canonical query string canonical_query = self.canonical_query(parsed.query) # get the canonical headers canonical_headers = self.canonical_headers(headers) # format the canonical headers canonical_header_string = '\n'.join(['{0}:{1}'.format(*header) for header in canonical_headers]) + '\n' # get the signed headers signed_headers = self.signed_headers(headers) # get the hashed payload hashed_payload = self.hash(payload) # join the parts to make the request: # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html # CanonicalRequest = # HTTPRequestMethod + '\n' + # CanonicalURI + '\n' + # CanonicalQueryString + '\n' + # CanonicalHeaders + '\n' + # SignedHeaders + '\n' + # HexEncode(Hash(RequestPayload)) parts = [method, canonical_uri, canonical_query, canonical_header_string, signed_headers, hashed_payload] canonical_request = '\n'.join(parts) return canonical_request def signed_request(self, url, method='GET', headers=None): """ prepare a request: http://docs.python-requests.org/en/latest/user/advanced/#prepared-requests """ # parse the URL, since we like doing that so much parsed = urlparse.urlsplit(url) # setup the headers if headers is None: headers = {} headers = OrderedDict(headers).copy() mapping = dict([(key.lower(), key) for key in headers]) # XXX this is..."fun" # maybe we should just x-form everything to lowercase now? # ensure host header is set if 'host' not in mapping: headers['Host'] = parsed.netloc # add the `x-amz-date` in terms of now: # http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonRequestHeaders.html if 'x-amz-date' not in mapping: headers['x-amz-date'] = self.datetime() # create a PreparedRequest req = requests.Request(method, url, headers) prepped = req.prepare() # return a signed version return self.sign_request(prepped) def sign_request(self, request): """ sign a request; http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html """ # ensure that we have a `x-amz-date` header in the request request_date = request.headers.get('x-amz-date') # Food for thought: perhaps here is a more appropriate place # to add the headers? probably. Likely. if request_date is None: raise NotImplementedError('TODO') # get the canonical request canonical_request = self.canonical_request(method=request.method, url=request.url, headers=request.headers) # Create a digest (hash) of the canonical request # with the same algorithm that you used to hash the payload. hashed_request = self.hash(canonical_request) # Create the string to sign: # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html # 1. Start with the algorithm designation parts = [self.algorithm] # 2. Append the request date value parts.append(request_date) # XXX we could validate the format # 3. Append the credential scope value date_string = request_date.split('T')[0] # XXX could do better credential_scope = self.credential_scope(date_string) parts.append(credential_scope) # 4. Append the hash of the canonical request parts.append(hashed_request) string_to_sign = '\n'.join(parts) # Calculate the AWS Signature Version 4 # http://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html # 1. Derive your signing key. signing_key = self.signature_key(date_string) # 2. Calculate the signature signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() # Add the signing information to the request # http://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html authorization = self.authorization_header.format(algorithm=self.algorithm, access_key=self.access_key, credential_scope=credential_scope, signed_headers=self.signed_headers(request.headers), signature=signature) request.headers['Authorization'] = authorization # return the prepared requests return request def main(args=sys.argv[1:]): """CLI""" # parse command line parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--credentials', '--print-credentials', dest='print_credentials', action='store_true', default=False, help="print default credentials for instance") parser.add_argument('--url', dest='url', help="hit this URL with a signed HTTP GET request") parser.add_argument('--service', dest='service', default='es', help="AWS service to use") parser.add_argument('--region', dest='region', default='us-west-1', help="AWS region") # TODO: `service` and `region` come from # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html # We need to be able to derive the region from the environment. # It would be very nice to derive the service from, say, the host options = parser.parse_args(args) if options.url: # get credentials credentials = AWSCredentials()() if not credentials: parser.error("No AWS credentials found") aws_key, aws_secret = credentials # make a signed request to the URL and exit request = SignedRequest(aws_key, aws_secret, region=options.region, service=options.service) response = request(options.url, method='GET') print ('-'*10) print (response.text) response.raise_for_status() return # metadata interface metadata = EC2Metadata() # get desired data if options.print_credentials: data = metadata.credentials() else: data = metadata() # display data print (json.dumps(data, indent=2, sort_keys=True)) if __name__ == '__main__': main()