Source code for ipfsapi.http

"""HTTP client for api requests.

This is pluggable into the IPFS Api client and will hopefully be supplemented
by an asynchronous version.
"""
from __future__ import absolute_import

import abc
import contextlib
import functools
import re
import tarfile
from six.moves import http_client

import requests
import six

from . import encoding
from . import exceptions


[docs]def pass_defaults(func): """Decorator that returns a function named wrapper. When invoked, wrapper invokes func with default kwargs appended. Parameters ---------- func : callable The function to append the default kwargs to """ @functools.wraps(func) def wrapper(self, *args, **kwargs): merged = {} merged.update(self.defaults) merged.update(kwargs) return func(self, *args, **merged) return wrapper
[docs]class HTTPClient(object): """An HTTP client for interacting with the IPFS daemon. Parameters ---------- host : str The host the IPFS daemon is running on port : int The port the IPFS daemon is running at base : str The path prefix for API calls defaults : dict The default parameters to be passed to :meth:`~ipfsapi.http.HTTPClient.request` """ __metaclass__ = abc.ABCMeta def __init__(self, host, port, base, **defaults): self.host = host self.port = port if not re.match('^https?://', host.lower()): host = 'http://' + host self.base = '%s:%s/%s' % (host, port, base) self.defaults = defaults self._session = None def _do_request(self, *args, **kwargs): try: if self._session: return self._session.request(*args, **kwargs) else: return requests.request(*args, **kwargs) except requests.ConnectionError as error: six.raise_from(exceptions.ConnectionError(error), error) except http_client.HTTPException as error: six.raise_from(exceptions.ProtocolError(error), error) except requests.Timeout as error: six.raise_from(exceptions.TimeoutError(error), error) def _do_raise_for_status(self, response, content=None): try: response.raise_for_status() except requests.exceptions.HTTPError as error: # If we have decoded an error response from the server, # use that as the exception message; otherwise, just pass # the exception on to the caller. if isinstance(content, dict) and 'Message' in content: msg = content['Message'] six.raise_from(exceptions.ErrorResponse(msg, error), error) else: six.raise_from(exceptions.StatusError(error), error) def _request(self, method, url, params, parser, stream=False, files=None, headers={}, data=None): # Do HTTP request (synchronously) res = self._do_request(method, url, params=params, stream=stream, files=files, headers=headers, data=data) if stream: # Raise exceptions for response status self._do_raise_for_status(res) # Decode each item as it is read def stream_decode(): for data in res: for result in parser.parse_partial(data): yield result for result in parser.parse_finalize(): yield result return stream_decode() else: # First decode received item ret = parser.parse(res.content) # Raise exception for response status # (optionally incorpating the response message, if applicable) self._do_raise_for_status(res, ret) return ret @pass_defaults
[docs] def request(self, path, args=[], files=[], opts={}, stream=False, decoder=None, headers={}, data=None): """Makes an HTTP request to the IPFS daemon. This function returns the contents of the HTTP response from the IPFS daemon. Raises ------ ~ipfsapi.exceptions.ErrorResponse ~ipfsapi.exceptions.ConnectionError ~ipfsapi.exceptions.ProtocolError ~ipfsapi.exceptions.StatusError ~ipfsapi.exceptions.TimeoutError Parameters ---------- path : str The REST command path to send args : list Positional parameters to be sent along with the HTTP request files : :class:`io.RawIOBase` | :obj:`str` | :obj:`list` The file object(s) or path(s) to stream to the daemon opts : dict Query string paramters to be sent along with the HTTP request decoder : str The encoder to use to parse the HTTP response kwargs : dict Additional arguments to pass to :mod:`requests` """ url = self.base + path params = [] params.append(('stream-channels', 'true')) for opt in opts.items(): params.append(opt) for arg in args: params.append(('arg', arg)) method = 'post' if (files or data) else 'get' parser = encoding.get_encoding(decoder if decoder else "none") return self._request(method, url, params, parser, stream, files, headers, data)
@pass_defaults
[docs] def download(self, path, args=[], filepath=None, opts={}, compress=True, **kwargs): """Makes a request to the IPFS daemon to download a file. Downloads a file or files from IPFS into the current working directory, or the directory given by ``filepath``. Raises ------ ~ipfsapi.exceptions.ErrorResponse ~ipfsapi.exceptions.ConnectionError ~ipfsapi.exceptions.ProtocolError ~ipfsapi.exceptions.StatusError ~ipfsapi.exceptions.TimeoutError Parameters ---------- path : str The REST command path to send filepath : str The local path where IPFS will store downloaded files Defaults to the current working directory. args : list Positional parameters to be sent along with the HTTP request opts : dict Query string paramters to be sent along with the HTTP request compress : bool Whether the downloaded file should be GZip compressed by the daemon before being sent to the client kwargs : dict Additional arguments to pass to :mod:`requests` """ url = self.base + path wd = filepath or '.' params = [] params.append(('stream-channels', 'true')) params.append(('archive', 'true')) if compress: params.append(('compress', 'true')) for opt in opts.items(): params.append(opt) for arg in args: params.append(('arg', arg)) method = 'get' res = self._do_request(method, url, params=params, stream=True, **kwargs) self._do_raise_for_status(res) # try to stream download as a tar file stream mode = 'r|gz' if compress else 'r|' with tarfile.open(fileobj=res.raw, mode=mode) as tf: tf.extractall(path=wd)
@contextlib.contextmanager
[docs] def session(self): """A context manager for this client's session. This function closes the current session when this client goes out of scope. """ self._session = requests.session() yield self._session.close() self._session = None