"""HTTP :mimetype:`multipart/*`-encoded file streaming.
"""
from __future__ import absolute_import
import re
import requests
import io
import os
from inspect import isgenerator
from uuid import uuid4
import six
from six.moves.urllib.parse import quote
from . import utils
if six.PY3:
from builtins import memoryview as buffer
CRLF = b'\r\n'
default_chunk_size = 4096
[docs]def content_disposition(fn, disptype='file'):
"""Returns a dict containing the MIME content-disposition header for a file.
.. code-block:: python
>>> content_disposition('example.txt')
{'Content-Disposition': 'file; filename="example.txt"'}
>>> content_disposition('example.txt', 'attachment')
{'Content-Disposition': 'attachment; filename="example.txt"'}
Parameters
----------
fn : str
Filename to retrieve the MIME content-disposition for
disptype : str
Rhe disposition type to use for the file
"""
disp = '%s; filename="%s"' % (
disptype,
quote(fn, safe='')
)
return {'Content-Disposition': disp}
[docs]def content_type(fn):
"""Returns a dict with the content-type header for a file.
Guesses the mimetype for a filename and returns a dict
containing the content-type header.
.. code-block:: python
>>> content_type('example.txt')
{'Content-Type': 'text/plain'}
>>> content_type('example.jpeg')
{'Content-Type': 'image/jpeg'}
>>> content_type('example')
{'Content-Type': 'application/octet-stream'}
Parameters
----------
fn : str
Filename to guess the content-type for
"""
return {'Content-Type': utils.guess_mimetype(fn)}
[docs]def multipart_content_type(boundary, subtype='mixed'):
"""Creates a MIME multipart header with the given configuration.
Returns a dict containing a MIME multipart header with the given
boundary.
.. code-block:: python
>>> multipart_content_type('8K5rNKlLQVyreRNncxOTeg')
{'Content-Type': 'multipart/mixed; boundary="8K5rNKlLQVyreRNncxOTeg"'}
>>> multipart_content_type('8K5rNKlLQVyreRNncxOTeg', 'alt')
{'Content-Type': 'multipart/alt; boundary="8K5rNKlLQVyreRNncxOTeg"'}
Parameters
----------
boundry : str
The content delimiter to put into the header
subtype : str
The subtype in :mimetype:`multipart/*`-domain to put into the header
"""
ctype = 'multipart/%s; boundary="%s"' % (
subtype,
boundary
)
return {'Content-Type': ctype}
[docs]class BodyGenerator(object):
"""Generators for creating the body of a :mimetype:`multipart/*`
HTTP request.
Parameters
----------
name : str
The filename of the file(s)/content being encoded
disptype : str
The ``Content-Disposition`` of the content
subtype : str
The :mimetype:`multipart/*`-subtype of the content
boundary : str
An identifier used as a delimiter for the content's body
"""
def __init__(self, name, disptype='file', subtype='mixed', boundary=None):
# If the boundary is unspecified, make a random one
if boundary is None:
boundary = self._make_boundary()
self.boundary = boundary
headers = content_disposition(name, disptype=disptype)
headers.update(multipart_content_type(boundary, subtype=subtype))
self.headers = headers
def _make_boundary(self):
"""Returns a random hexadecimal string (UUID 4).
The HTTP multipart request body spec requires a boundary string to
separate different content chunks within a request, and this is
usually a random string. Using a UUID is an easy way to generate
a random string of appropriate length as this content separator.
"""
return uuid4().hex
def _write_headers(self, headers):
"""Yields the HTTP header text for some content.
Parameters
----------
headers : dict
The headers to yield
"""
if headers:
for name in sorted(headers.keys()):
yield name.encode("ascii")
yield b': '
yield headers[name].encode("ascii")
yield CRLF
yield CRLF
[docs] def write_headers(self):
"""Yields the HTTP header text for the content."""
for c in self._write_headers(self.headers):
yield c
[docs] def open(self, **kwargs):
"""Yields the body section for the content.
"""
yield b'--'
yield self.boundary.encode()
yield CRLF
[docs] def file_open(self, fn):
"""Yields the opening text of a file section in multipart HTTP.
Parameters
----------
fn : str
Filename for the file being opened and added to the HTTP body
"""
yield b'--'
yield self.boundary.encode()
yield CRLF
headers = content_disposition(fn)
headers.update(content_type(fn))
for c in self._write_headers(headers):
yield c
[docs] def file_close(self):
"""Yields the end text of a file section in HTTP multipart encoding."""
yield CRLF
[docs] def close(self):
"""Yields the ends of the content area in a HTTP multipart body."""
yield b'--'
yield self.boundary.encode()
yield b'--'
yield CRLF
[docs]class BufferedGenerator(object):
"""Generator that encodes multipart/form-data.
An abstract buffered generator class which encodes
:mimetype:`multipart/form-data`.
Parameters
----------
name : str
The name of the file to encode
chunk_size : int
The maximum size that any single file chunk may have in bytes
"""
def __init__(self, name, chunk_size=default_chunk_size):
self.chunk_size = chunk_size
self._internal = bytearray(chunk_size)
self.buf = buffer(self._internal)
self.name = name
self.envelope = BodyGenerator(self.name,
disptype='form-data',
subtype='form-data')
self.headers = self.envelope.headers
[docs] def file_chunks(self, fp):
"""Yields chunks of a file.
Parameters
----------
fp : io.RawIOBase
The file to break into chunks
(must be an open file or have the ``readinto`` method)
"""
fsize = utils.file_size(fp)
offset = 0
if hasattr(fp, 'readinto'):
while offset < fsize:
nb = fp.readinto(self._internal)
yield self.buf[:nb]
offset += nb
else:
while offset < fsize:
nb = min(self.chunk_size, fsize - offset)
yield fp.read(nb)
offset += nb
[docs] def gen_chunks(self, gen):
"""Generates byte chunks of a given size.
Takes a bytes generator and yields chunks of a maximum of
``chunk_size`` bytes.
Parameters
----------
gen : generator
The bytes generator that produces the bytes
"""
for data in gen:
size = len(data)
if size < self.chunk_size:
yield data
else:
mv = buffer(data)
offset = 0
while offset < size:
nb = min(self.chunk_size, size - offset)
yield mv[offset:offset + nb]
offset += nb
[docs] def body(self, *args, **kwargs):
"""Returns the body of the buffered file.
.. note:: This function is not actually implemented.
"""
raise NotImplementedError
[docs] def close(self):
"""Yields the closing text of a multipart envelope."""
for chunk in self.gen_chunks(self.envelope.close()):
yield chunk
[docs]class FileStream(BufferedGenerator):
"""Generator that encodes multiples files into HTTP multipart.
A buffered generator that encodes an array of files as
:mimetype:`multipart/form-data`. This is a concrete implementation of
:class:`~ipfsapi.multipart.BufferedGenerator`.
Parameters
----------
name : str
The filename of the file to encode
chunk_size : int
The maximum size that any single file chunk may have in bytes
"""
def __init__(self, files, chunk_size=default_chunk_size):
BufferedGenerator.__init__(self, 'files', chunk_size=chunk_size)
self.files = utils.clean_files(files)
[docs] def body(self):
"""Yields the body of the buffered file."""
for fp, need_close in self.files:
try:
name = os.path.basename(fp.name)
except AttributeError:
name = ''
for chunk in self.gen_chunks(self.envelope.file_open(name)):
yield chunk
for chunk in self.file_chunks(fp):
yield chunk
for chunk in self.gen_chunks(self.envelope.file_close()):
yield chunk
if need_close:
fp.close()
for chunk in self.close():
yield chunk
[docs]def glob_compile(pat):
"""Translate a shell glob PATTERN to a regular expression.
This is almost entirely based on `fnmatch.translate` source-code from the
python 3.5 standard-library.
"""
i, n = 0, len(pat)
res = ''
while i < n:
c = pat[i]
i = i + 1
if c == '/' and len(pat) > (i + 2) and pat[i:(i + 3)] == '**/':
# Special-case for "any number of sub-directories" operator since
# may also expand to no entries:
# Otherwise `a/**/b` would expand to `a[/].*[/]b` which wouldn't
# match the immediate sub-directories of `a`, like `a/b`.
i = i + 3
res = res + '[/]([^/]*[/])*'
elif c == '*':
if len(pat) > i and pat[i] == '*':
i = i + 1
res = res + '.*'
else:
res = res + '[^/]*'
elif c == '?':
res = res + '[^/]'
elif c == '[':
j = i
if j < n and pat[j] == '!':
j = j + 1
if j < n and pat[j] == ']':
j = j + 1
while j < n and pat[j] != ']':
j = j + 1
if j >= n:
res = res + '\\['
else:
stuff = pat[i:j].replace('\\', '\\\\')
i = j + 1
if stuff[0] == '!':
stuff = '^' + stuff[1:]
elif stuff[0] == '^':
stuff = '\\' + stuff
res = '%s[%s]' % (res, stuff)
else:
res = res + re.escape(c)
return re.compile('^' + res + '\Z(?ms)' + '$')
[docs]class DirectoryStream(BufferedGenerator):
"""Generator that encodes a directory into HTTP multipart.
A buffered generator that encodes an array of files as
:mimetype:`multipart/form-data`. This is a concrete implementation of
:class:`~ipfsapi.multipart.BufferedGenerator`.
Parameters
----------
directory : str
The filepath of the directory to encode
patterns : str | list
A single glob pattern or a list of several glob patterns and
compiled regular expressions used to determine which filepaths to match
chunk_size : int
The maximum size that any single file chunk may have in bytes
"""
def __init__(self,
directory,
recursive=False,
patterns='**',
chunk_size=default_chunk_size):
BufferedGenerator.__init__(self, directory, chunk_size=chunk_size)
self.patterns = []
patterns = [patterns] if isinstance(patterns, str) else patterns
for pattern in patterns:
if isinstance(pattern, str):
self.patterns.append(glob_compile(pattern))
else:
self.patterns.append(pattern)
self.directory = os.path.normpath(directory)
self.recursive = recursive
self._request = self._prepare()
self.headers = self._request.headers
[docs] def body(self):
"""Returns the HTTP headers for this directory upload request."""
return self._request.body
def _prepare(self):
"""Pre-formats the multipart HTTP request to transmit the directory."""
names = []
added_directories = set()
def add_directory(short_path):
# Do not continue if this directory has already been added
if short_path in added_directories:
return
# Scan for first super-directory that has already been added
dir_base = short_path
dir_parts = []
while dir_base:
dir_base, dir_name = os.path.split(dir_base)
dir_parts.append(dir_name)
if dir_base in added_directories:
break
# Add missing intermediate directory nodes in the right order
while dir_parts:
dir_base = os.path.join(dir_base, dir_parts.pop())
# Create an empty, fake file to represent the directory
mock_file = io.StringIO()
mock_file.write(u'')
# Add this directory to those that will be sent
names.append(('files',
(dir_base, mock_file, 'application/x-directory')))
# Remember that this directory has already been sent
added_directories.add(dir_base)
def add_file(short_path, full_path):
try:
# Always add files in wildcard directories
names.append(('files', (short_name,
open(full_path, 'rb'),
'application/octet-stream')))
except OSError:
# File might have disappeared between `os.walk()` and `open()`
pass
def match_short_path(short_path):
# Remove initial path component so that all files are based in
# the target directory itself (not one level above)
if os.sep in short_path:
path = short_path.split(os.sep, 1)[1]
else:
return False
# Convert all path seperators to POSIX style
path = path.replace(os.sep, '/')
# Do the matching and the simplified path
for pattern in self.patterns:
if pattern.match(path):
return True
return False
# Identify the unecessary portion of the relative path
truncate = os.path.dirname(self.directory)
# Traverse the filesystem downward from the target directory's uri
# Errors: `os.walk()` will simply return an empty generator if the
# target directory does not exist.
wildcard_directories = set()
for curr_dir, _, files in os.walk(self.directory):
# find the path relative to the directory being added
if len(truncate) > 0:
_, _, short_path = curr_dir.partition(truncate)
else:
short_path = curr_dir
# remove leading / or \ if it is present
if short_path.startswith(os.sep):
short_path = short_path[1:]
wildcard_directory = False
if os.path.split(short_path)[0] in wildcard_directories:
# Parent directory has matched a pattern, all sub-nodes should
# be added too
wildcard_directories.add(short_path)
wildcard_directory = True
else:
# Check if directory path matches one of the patterns
if match_short_path(short_path):
# Directory matched pattern and it should therefor
# be added along with all of its contents
wildcard_directories.add(short_path)
wildcard_directory = True
# Always add directories within wildcard directories - even if they
# are empty
if wildcard_directory:
add_directory(short_path)
# Iterate across the files in the current directory
for filename in files:
# Find the filename relative to the directory being added
short_name = os.path.join(short_path, filename)
filepath = os.path.join(curr_dir, filename)
if wildcard_directory:
# Always add files in wildcard directories
add_file(short_name, filepath)
else:
# Add file (and all missing intermediary directories)
# if it matches one of the patterns
if match_short_path(short_name):
add_directory(short_path)
add_file(short_name, filepath)
# Send the request and present the response body to the user
req = requests.Request("POST", 'http://localhost', files=names)
prep = req.prepare()
return prep
[docs]class BytesStream(BufferedGenerator):
"""A buffered generator that encodes bytes as
:mimetype:`multipart/form-data`.
Parameters
----------
data : bytes
The binary data to stream to the daemon
chunk_size : int
The maximum size of a single data chunk
"""
def __init__(self, data, chunk_size=default_chunk_size):
BufferedGenerator.__init__(self, 'bytes', chunk_size=chunk_size)
self.data = data if isgenerator(data) else (data,)
[docs] def body(self):
"""Yields the encoded body."""
for chunk in self.gen_chunks(self.envelope.file_open(self.name)):
yield chunk
for chunk in self.gen_chunks(self.data):
yield chunk
for chunk in self.gen_chunks(self.envelope.file_close()):
yield chunk
for chunk in self.close():
yield chunk
[docs]def stream_files(files, chunk_size=default_chunk_size):
"""Gets a buffered generator for streaming files.
Returns a buffered generator which encodes a file or list of files as
:mimetype:`multipart/form-data` with the corresponding headers.
Parameters
----------
files : str
The file(s) to stream
chunk_size : int
Maximum size of each stream chunk
"""
stream = FileStream(files, chunk_size=chunk_size)
return stream.body(), stream.headers
[docs]def stream_directory(directory,
recursive=False,
patterns='**',
chunk_size=default_chunk_size):
"""Gets a buffered generator for streaming directories.
Returns a buffered generator which encodes a directory as
:mimetype:`multipart/form-data` with the corresponding headers.
Parameters
----------
directory : str
The filepath of the directory to stream
recursive : bool
Stream all content within the directory recursively?
patterns : str | list
Single *glob* pattern or list of *glob* patterns and compiled
regular expressions to match the names of the filepaths to keep
chunk_size : int
Maximum size of each stream chunk
"""
stream = DirectoryStream(directory,
recursive=recursive,
patterns=patterns,
chunk_size=chunk_size)
return stream.body(), stream.headers
[docs]def stream_filesystem_node(path,
recursive=False,
patterns='**',
chunk_size=default_chunk_size):
"""Gets a buffered generator for streaming either files or directories.
Returns a buffered generator which encodes the file or directory at the
given path as :mimetype:`multipart/form-data` with the corresponding
headers.
Parameters
----------
path : str
The filepath of the directory or file to stream
recursive : bool
Stream all content within the directory recursively?
patterns : str | list
Single *glob* pattern or list of *glob* patterns and compiled
regular expressions to match the names of the filepaths to keep
chunk_size : int
Maximum size of each stream chunk
"""
is_dir = isinstance(path, six.string_types) and os.path.isdir(path)
if recursive or is_dir:
return stream_directory(path, recursive, patterns, chunk_size)
else:
return stream_files(path, chunk_size)
[docs]def stream_bytes(data, chunk_size=default_chunk_size):
"""Gets a buffered generator for streaming binary data.
Returns a buffered generator which encodes binary data as
:mimetype:`multipart/form-data` with the corresponding headers.
Parameters
----------
data : bytes
The data bytes to stream
chunk_size : int
The maximum size of each stream chunk
Returns
-------
(generator, dict)
"""
stream = BytesStream(data, chunk_size=chunk_size)
return stream.body(), stream.headers
[docs]def stream_text(text, chunk_size=default_chunk_size):
"""Gets a buffered generator for streaming text.
Returns a buffered generator which encodes a string as
:mimetype:`multipart/form-data` with the corresponding headers.
Parameters
----------
text : str
The data bytes to stream
chunk_size : int
The maximum size of each stream chunk
Returns
-------
(generator, dict)
"""
if isgenerator(text):
def binary_stream():
for item in text:
if six.PY2 and isinstance(text, six.binary_type):
#PY2: Allow binary strings under Python 2 since
# Python 2 code is not expected to always get the
# distinction between text and binary strings right.
yield text
else:
yield text.encode("utf-8")
data = binary_stream()
elif six.PY2 and isinstance(text, six.binary_type):
#PY2: See above.
data = text
else:
data = text.encode("utf-8")
return stream_bytes(data, chunk_size)