Kaydet (Commit) 68b92ce7 authored tarafından Ben Firshman's avatar Ben Firshman Kaydeden (comit) Joffrey F

Add new user-focused API

See #1086
Signed-off-by: 's avatarBen Firshman <ben@firshman.co.uk>
üst d3c9fcfa
docker-py
=========
# Docker SDK for Python
[![Build Status](https://travis-ci.org/docker/docker-py.png)](https://travis-ci.org/docker/docker-py)
A Python library for the Docker Remote API. It does everything the `docker` command does, but from within Python – run containers, manage them, pull/push images, etc.
A Python library for the Docker API. It lets you do anything the `docker` command does, but from within Python apps – run containers, manage containers, manage Swarms, etc.
Installation
------------
## Installation
The latest stable version is always available on PyPi.
The latest stable version [is available on PyPi](https://pypi.python.org/pypi/docker/). Either add `docker` to your `requirements.txt` file or install with pip:
pip install docker-py
pip install docker
Documentation
-------------
## Usage
[![Documentation Status](https://readthedocs.org/projects/docker-py/badge/?version=latest)](https://readthedocs.org/projects/docker-py/?badge=latest)
Connect to Docker using the default socket or the configuration in your environment:
[Read the full documentation here](https://docker-py.readthedocs.io/en/latest/).
The source is available in the `docs/` directory.
```python
import docker
client = docker.from_env()
```
You can run containers:
License
-------
Docker is licensed under the Apache License, Version 2.0. See LICENSE for full license text
```python
>>> client.containers.run("ubuntu", "echo hello world")
'hello world\n'
```
You can run containers in the background:
```python
>>> client.containers.run("bfirsh/reticulate-splines", detach=True)
<Container '45e6d2de7c54'>
```
You can manage containers:
```python
>>> client.containers.list()
[<Container '45e6d2de7c54'>, <Container 'db18e4f20eaa'>, ...]
>>> container = client.containers.get('45e6d2de7c54')
>>> container.attrs['Config']['Image']
"bfirsh/reticulate-splines"
>>> container.logs()
"Reticulating spline 1...\n"
>>> container.stop()
```
You can stream logs:
```python
>>> for line in container.logs(stream=True):
... print line.strip()
Reticulating spline 2...
Reticulating spline 3...
...
```
You can manage images:
```python
>>> client.images.pull('nginx')
<Image 'nginx'>
>>> client.images.list()
[<Image 'ubuntu'>, <Image 'nginx'>, ...]
```
[Read the full documentation](https://docs.docker.com/sdk/python/) to see everything you can do.
# flake8: noqa
from .api import APIClient
from .client import Client, from_env
from .version import version, version_info
__version__ = version
......
......@@ -22,10 +22,11 @@ from ..constants import (DEFAULT_TIMEOUT_SECONDS, DEFAULT_USER_AGENT,
IS_WINDOWS_PLATFORM, DEFAULT_DOCKER_API_VERSION,
STREAM_HEADER_SIZE_BYTES, DEFAULT_NUM_POOLS,
MINIMUM_DOCKER_API_VERSION)
from ..errors import DockerException, APIError, TLSParameterError, NotFound
from ..errors import (DockerException, TLSParameterError,
create_api_error_from_http_exception)
from ..tls import TLSConfig
from ..transport import UnixAdapter
from ..utils import utils, check_resource, update_headers, kwargs_from_env
from ..utils import utils, check_resource, update_headers
from ..utils.socket import frames_iter
try:
from ..transport import NpipeAdapter
......@@ -33,10 +34,6 @@ except ImportError:
pass
def from_env(**kwargs):
return APIClient.from_env(**kwargs)
class APIClient(
requests.Session,
BuildApiMixin,
......@@ -152,13 +149,6 @@ class APIClient(
MINIMUM_DOCKER_API_VERSION, self._version)
)
@classmethod
def from_env(cls, **kwargs):
timeout = kwargs.pop('timeout', None)
version = kwargs.pop('version', None)
return cls(timeout=timeout, version=version,
**kwargs_from_env(**kwargs))
def _retrieve_server_version(self):
try:
return self.version(api_version=False)["ApiVersion"]
......@@ -212,14 +202,12 @@ class APIClient(
else:
return '{0}{1}'.format(self.base_url, pathfmt.format(*args))
def _raise_for_status(self, response, explanation=None):
def _raise_for_status(self, response):
"""Raises stored :class:`APIError`, if one occurred."""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise NotFound(e, response, explanation=explanation)
raise APIError(e, response, explanation=explanation)
raise create_api_error_from_http_exception(e)
def _result(self, response, json=False, binary=False):
assert not (json and binary)
......
from .api.client import APIClient
from .models.containers import ContainerCollection
from .models.images import ImageCollection
from .models.networks import NetworkCollection
from .models.nodes import NodeCollection
from .models.services import ServiceCollection
from .models.swarm import Swarm
from .models.volumes import VolumeCollection
from .utils import kwargs_from_env
class Client(object):
"""
A client for communicating with a Docker server.
Example:
>>> import docker
>>> client = Client(base_url='unix://var/run/docker.sock')
Args:
base_url (str): URL to the Docker server. For example,
``unix:///var/run/docker.sock`` or ``tcp://127.0.0.1:1234``.
version (str): The version of the API to use. Set to ``auto`` to
automatically detect the server's version. Default: ``1.24``
timeout (int): Default timeout for API calls, in seconds.
tls (bool or :py:class:`~docker.tls.TLSConfig`): Enable TLS. Pass
``True`` to enable it with default options, or pass a
:py:class:`~docker.tls.TLSConfig` object to use custom
configuration.
user_agent (str): Set a custom user agent for requests to the server.
"""
def __init__(self, *args, **kwargs):
self.api = APIClient(*args, **kwargs)
@classmethod
def from_env(cls, **kwargs):
"""
Return a client configured from environment variables.
The environment variables used are the same as those used by the
Docker command-line client. They are:
.. envvar:: DOCKER_HOST
The URL to the Docker host.
.. envvar:: DOCKER_TLS_VERIFY
Verify the host against a CA certificate.
.. envvar:: DOCKER_CERT_PATH
A path to a directory containing TLS certificates to use when
connecting to the Docker host.
Args:
version (str): The version of the API to use. Set to ``auto`` to
automatically detect the server's version. Default: ``1.24``
timeout (int): Default timeout for API calls, in seconds.
ssl_version (int): A valid `SSL version`_.
assert_hostname (bool): Verify the hostname of the server.
environment (dict): The environment to read environment variables
from. Default: the value of ``os.environ``
Example:
>>> import docker
>>> client = docker.from_env()
.. _`SSL version`:
https://docs.python.org/3.5/library/ssl.html#ssl.PROTOCOL_TLSv1
"""
timeout = kwargs.pop('timeout', None)
version = kwargs.pop('version', None)
return cls(timeout=timeout, version=version,
**kwargs_from_env(**kwargs))
# Resources
@property
def containers(self):
"""
An object for managing containers on the server. See the
:doc:`containers documentation <containers>` for full details.
"""
return ContainerCollection(client=self)
@property
def images(self):
"""
An object for managing images on the server. See the
:doc:`images documentation <images>` for full details.
"""
return ImageCollection(client=self)
@property
def networks(self):
"""
An object for managing networks on the server. See the
:doc:`networks documentation <networks>` for full details.
"""
return NetworkCollection(client=self)
@property
def nodes(self):
"""
An object for managing nodes on the server. See the
:doc:`nodes documentation <nodes>` for full details.
"""
return NodeCollection(client=self)
@property
def services(self):
"""
An object for managing services on the server. See the
:doc:`services documentation <services>` for full details.
"""
return ServiceCollection(client=self)
@property
def swarm(self):
"""
An object for managing a swarm on the server. See the
:doc:`swarm documentation <swarm>` for full details.
"""
return Swarm(client=self)
@property
def volumes(self):
"""
An object for managing volumes on the server. See the
:doc:`volumes documentation <volumes>` for full details.
"""
return VolumeCollection(client=self)
# Top-level methods
def events(self, *args, **kwargs):
return self.api.events(*args, **kwargs)
events.__doc__ = APIClient.events.__doc__
def info(self, *args, **kwargs):
return self.api.info(*args, **kwargs)
info.__doc__ = APIClient.info.__doc__
def login(self, *args, **kwargs):
return self.api.login(*args, **kwargs)
login.__doc__ = APIClient.login.__doc__
def ping(self, *args, **kwargs):
return self.api.ping(*args, **kwargs)
ping.__doc__ = APIClient.ping.__doc__
def version(self, *args, **kwargs):
return self.api.version(*args, **kwargs)
version.__doc__ = APIClient.version.__doc__
from_env = Client.from_env
import requests
class APIError(requests.exceptions.HTTPError):
def __init__(self, message, response, explanation=None):
class DockerException(Exception):
"""
A base class from which all other exceptions inherit.
If you want to catch all errors that the Docker SDK might raise,
catch this base exception.
"""
def create_api_error_from_http_exception(e):
"""
Create a suitable APIError from requests.exceptions.HTTPError.
"""
response = e.response
try:
explanation = response.json()['message']
except ValueError:
explanation = response.content.strip()
cls = APIError
if response.status_code == 404:
if explanation and 'No such image' in str(explanation):
cls = ImageNotFound
else:
cls = NotFound
raise cls(e, response=response, explanation=explanation)
class APIError(requests.exceptions.HTTPError, DockerException):
"""
An HTTP error from the API.
"""
def __init__(self, message, response=None, explanation=None):
# requests 1.2 supports response as a keyword argument, but
# requests 1.1 doesn't
super(APIError, self).__init__(message)
self.response = response
self.explanation = explanation
if self.explanation is None and response.content:
try:
self.explanation = response.json()['message']
except ValueError:
self.explanation = response.content.strip()
def __str__(self):
message = super(APIError, self).__str__()
......@@ -32,18 +55,27 @@ class APIError(requests.exceptions.HTTPError):
return message
@property
def status_code(self):
if self.response:
return self.response.status_code
def is_client_error(self):
return 400 <= self.response.status_code < 500
if self.status_code is None:
return False
return 400 <= self.status_code < 500
def is_server_error(self):
return 500 <= self.response.status_code < 600
if self.status_code is None:
return False
return 500 <= self.status_code < 600
class DockerException(Exception):
class NotFound(APIError):
pass
class NotFound(APIError):
class ImageNotFound(NotFound):
pass
......@@ -76,3 +108,38 @@ class TLSParameterError(DockerException):
class NullResource(DockerException, ValueError):
pass
class ContainerError(DockerException):
"""
Represents a container that has exited with a non-zero exit code.
"""
def __init__(self, container, exit_status, command, image, stderr):
self.container = container
self.exit_status = exit_status
self.command = command
self.image = image
self.stderr = stderr
msg = ("Command '{}' in image '{}' returned non-zero exit status {}: "
"{}").format(command, image, exit_status, stderr)
super(ContainerError, self).__init__(msg)
class StreamParseError(RuntimeError):
def __init__(self, reason):
self.msg = reason
class BuildError(Exception):
pass
def create_unexpected_kwargs_error(name, kwargs):
quoted_kwargs = ["'{}'".format(k) for k in sorted(kwargs)]
text = ["{}() ".format(name)]
if len(quoted_kwargs) == 1:
text.append("got an unexpected keyword argument ")
else:
text.append("got unexpected keyword arguments ")
text.append(', '.join(quoted_kwargs))
return TypeError(''.join(text))
This diff is collapsed.
import re
import six
from ..api import APIClient
from ..errors import BuildError
from ..utils.json_stream import json_stream
from .resource import Collection, Model
class Image(Model):
"""
An image on the server.
"""
def __repr__(self):
return "<%s: '%s'>" % (self.__class__.__name__, "', '".join(self.tags))
@property
def short_id(self):
"""
The ID of the image truncated to 10 characters, plus the ``sha256:``
prefix.
"""
if self.id.startswith('sha256:'):
return self.id[:17]
return self.id[:10]
@property
def tags(self):
"""
The image's tags.
"""
return [
tag for tag in self.attrs.get('RepoTags', [])
if tag != '<none>:<none>'
]
def history(self):
"""
Show the history of an image.
Returns:
(str): The history of the image.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.history(self.id)
def save(self):
"""
Get a tarball of an image. Similar to the ``docker save`` command.
Returns:
(urllib3.response.HTTPResponse object): The response from the
daemon.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> image = cli.get("fedora:latest")
>>> resp = image.save()
>>> f = open('/tmp/fedora-latest.tar', 'w')
>>> f.write(resp.data)
>>> f.close()
"""
return self.client.api.get_image(self.id)
def tag(self, repository, tag=None, **kwargs):
"""
Tag this image into a repository. Similar to the ``docker tag``
command.
Args:
repository (str): The repository to set for the tag
tag (str): The tag name
force (bool): Force
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Returns:
(bool): ``True`` if successful
"""
self.client.api.tag(self.id, repository, tag=tag, **kwargs)
class ImageCollection(Collection):
model = Image
def build(self, **kwargs):
"""
Build an image and return it. Similar to the ``docker build``
command. Either ``path`` or ``fileobj`` must be set.
If you have a tar file for the Docker build context (including a
Dockerfile) already, pass a readable file-like object to ``fileobj``
and also pass ``custom_context=True``. If the stream is compressed
also, set ``encoding`` to the correct value (e.g ``gzip``).
If you want to get the raw output of the build, use the
:py:meth:`~docker.api.build.BuildApiMixin.build` method in the
low-level API.
Args:
path (str): Path to the directory containing the Dockerfile
fileobj: A file object to use as the Dockerfile. (Or a file-like
object)
tag (str): A tag to add to the final image
quiet (bool): Whether to return the status
nocache (bool): Don't use the cache when set to ``True``
rm (bool): Remove intermediate containers. The ``docker build``
command now defaults to ``--rm=true``, but we have kept the old
default of `False` to preserve backward compatibility
stream (bool): *Deprecated for API version > 1.8 (always True)*.
Return a blocking generator you can iterate over to retrieve
build output as it happens
timeout (int): HTTP timeout
custom_context (bool): Optional if using ``fileobj``
encoding (str): The encoding for a stream. Set to ``gzip`` for
compressing
pull (bool): Downloads any updates to the FROM image in Dockerfiles
forcerm (bool): Always remove intermediate containers, even after
unsuccessful builds
dockerfile (str): path within the build context to the Dockerfile
buildargs (dict): A dictionary of build arguments
container_limits (dict): A dictionary of limits applied to each
container created by the build process. Valid keys:
- memory (int): set memory limit for build
- memswap (int): Total memory (memory + swap), -1 to disable
swap
- cpushares (int): CPU shares (relative weight)
- cpusetcpus (str): CPUs in which to allow execution, e.g.,
``"0-3"``, ``"0,1"``
decode (bool): If set to ``True``, the returned stream will be
decoded into dicts on the fly. Default ``False``.
Returns:
(:py:class:`Image`): The built image.
Raises:
:py:class:`docker.errors.BuildError`
If there is an error during the build.
:py:class:`docker.errors.APIError`
If the server returns any other error.
``TypeError``
If neither ``path`` nor ``fileobj`` is specified.
"""
resp = self.client.api.build(**kwargs)
if isinstance(resp, six.string_types):
return self.get(resp)
events = list(json_stream(resp))
if not events:
return BuildError('Unknown')
event = events[-1]
if 'stream' in event:
match = re.search(r'Successfully built ([0-9a-f]+)',
event.get('stream', ''))
if match:
image_id = match.group(1)
return self.get(image_id)
raise BuildError(event.get('error') or event)
def get(self, name):
"""
Gets an image.
Args:
name (str): The name of the image.
Returns:
(:py:class:`Image`): The image.
Raises:
:py:class:`docker.errors.ImageNotFound` If the image does not
exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_image(name))
def list(self, name=None, all=False, filters=None):
"""
List images on the server.
Args:
name (str): Only show images belonging to the repository ``name``
all (bool): Show intermediate image layers. By default, these are
filtered out.
filters (dict): Filters to be processed on the image list.
Available filters:
- ``dangling`` (bool)
- ``label`` (str): format either ``key`` or ``key=value``
Returns:
(list of :py:class:`Image`): The images.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.images(name=name, all=all, filters=filters)
return [self.prepare_model(r) for r in resp]
def load(self, data):
"""
Load an image that was previously saved using
:py:meth:`~docker.models.images.Image.save` (or ``docker save``).
Similar to ``docker load``.
Args:
data (binary): Image data to be loaded.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.load_image(data)
def pull(self, name, **kwargs):
"""
Pull an image of the given name and return it. Similar to the
``docker pull`` command.
If you want to get the raw pull output, use the
:py:meth:`~docker.api.image.ImageApiMixin.pull` method in the
low-level API.
Args:
repository (str): The repository to pull
tag (str): The tag to pull
insecure_registry (bool): Use an insecure registry
auth_config (dict): Override the credentials that
:py:meth:`~docker.client.Client.login` has set for
this request. ``auth_config`` should contain the ``username``
and ``password`` keys to be valid.
Returns:
(:py:class:`Image`): The image that has been pulled.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> image = client.images.pull('busybox')
"""
self.client.api.pull(name, **kwargs)
return self.get(name)
def push(self, repository, tag=None, **kwargs):
return self.client.api.push(repository, tag=tag, **kwargs)
push.__doc__ = APIClient.push.__doc__
def remove(self, *args, **kwargs):
self.client.api.remove_image(*args, **kwargs)
remove.__doc__ = APIClient.remove_image.__doc__
def search(self, *args, **kwargs):
return self.client.api.search(*args, **kwargs)
search.__doc__ = APIClient.search.__doc__
from .containers import Container
from .resource import Model, Collection
class Network(Model):
"""
A Docker network.
"""
@property
def name(self):
"""
The name of the network.
"""
return self.attrs.get('Name')
@property
def containers(self):
"""
The containers that are connected to the network, as a list of
:py:class:`~docker.models.containers.Container` objects.
"""
return [
self.client.containers.get(cid) for cid in
self.attrs.get('Containers', {}).keys()
]
def connect(self, container):
"""
Connect a container to this network.
Args:
container (str): Container to connect to this network, as either
an ID, name, or :py:class:`~docker.models.containers.Container`
object.
aliases (list): A list of aliases for this endpoint. Names in that
list can be used within the network to reach the container.
Defaults to ``None``.
links (list): A list of links for this endpoint. Containers
declared in this list will be linkedto this container.
Defaults to ``None``.
ipv4_address (str): The IP address of this container on the
network, using the IPv4 protocol. Defaults to ``None``.
ipv6_address (str): The IP address of this container on the
network, using the IPv6 protocol. Defaults to ``None``.
link_local_ips (list): A list of link-local (IPv4/IPv6) addresses.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(container, Container):
container = container.id
return self.client.api.connect_container_to_network(container, self.id)
def disconnect(self, container):
"""
Disconnect a container from this network.
Args:
container (str): Container to disconnect from this network, as
either an ID, name, or
:py:class:`~docker.models.containers.Container` object.
force (bool): Force the container to disconnect from a network.
Default: ``False``
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if isinstance(container, Container):
container = container.id
return self.client.api.disconnect_container_from_network(container,
self.id)
def remove(self):
"""
Remove this network.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.remove_network(self.id)
class NetworkCollection(Collection):
"""
Networks on the Docker server.
"""
model = Network
def create(self, name, *args, **kwargs):
"""
Create a network. Similar to the ``docker network create``.
Args:
name (str): Name of the network
driver (str): Name of the driver used to create the network
options (dict): Driver options as a key-value dictionary
ipam (dict): Optional custom IP scheme for the network.
Created with :py:meth:`~docker.utils.create_ipam_config`.
check_duplicate (bool): Request daemon to check for networks with
same name. Default: ``True``.
internal (bool): Restrict external access to the network. Default
``False``.
labels (dict): Map of labels to set on the network. Default
``None``.
enable_ipv6 (bool): Enable IPv6 on the network. Default ``False``.
Returns:
(:py:class:`Network`): The network that was created.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
A network using the bridge driver:
>>> client.networks.create("network1", driver="bridge")
You can also create more advanced networks with custom IPAM
configurations. For example, setting the subnet to
``192.168.52.0/24`` and gateway address to ``192.168.52.254``.
.. code-block:: python
>>> ipam_pool = docker.utils.create_ipam_pool(
subnet='192.168.52.0/24',
gateway='192.168.52.254'
)
>>> ipam_config = docker.utils.create_ipam_config(
pool_configs=[ipam_pool]
)
>>> client.networks.create(
"network1",
driver="bridge",
ipam=ipam_config
)
"""
resp = self.client.api.create_network(name, *args, **kwargs)
return self.get(resp['Id'])
def get(self, network_id):
"""
Get a network by its ID.
Args:
network_id (str): The ID of the network.
Returns:
(:py:class:`Network`) The network.
Raises:
:py:class:`docker.errors.NotFound`
If the network does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_network(network_id))
def list(self, *args, **kwargs):
"""
List networks. Similar to the ``docker networks ls`` command.
Args:
names (list): List of names to filter by.
ids (list): List of ids to filter by.
Returns:
(list of :py:class:`Network`) The networks on the server.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.networks(*args, **kwargs)
return [self.prepare_model(item) for item in resp]
from .resource import Model, Collection
class Node(Model):
"""A node in a swarm."""
id_attribute = 'ID'
@property
def version(self):
"""
The version number of the service. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def update(self, node_spec):
"""
Update the node's configuration.
Args:
node_spec (dict): Configuration settings to update. Any values
not provided will be removed. Default: ``None``
Returns:
`True` if the request went through.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> node_spec = {'Availability': 'active',
'Name': 'node-name',
'Role': 'manager',
'Labels': {'foo': 'bar'}
}
>>> node.update(node_spec)
"""
return self.client.api.update_node(self.id, self.version, node_spec)
class NodeCollection(Collection):
"""Nodes on the Docker server."""
model = Node
def get(self, node_id):
"""
Get a node.
Args:
node_id (string): ID of the node to be inspected.
Returns:
A :py:class:`Node` object.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_node(node_id))
def list(self, *args, **kwargs):
"""
List swarm nodes.
Args:
filters (dict): Filters to process on the nodes list. Valid
filters: ``id``, ``name``, ``membership`` and ``role``.
Default: ``None``
Returns:
A list of :py:class:`Node` objects.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> client.nodes.list(filters={'role': 'manager'})
"""
return [
self.prepare_model(n)
for n in self.client.api.nodes(*args, **kwargs)
]
class Model(object):
"""
A base class for representing a single object on the server.
"""
id_attribute = 'Id'
def __init__(self, attrs=None, client=None, collection=None):
#: A client pointing at the server that this object is on.
self.client = client
#: The collection that this model is part of.
self.collection = collection
#: The raw representation of this object from the API
self.attrs = attrs
if self.attrs is None:
self.attrs = {}
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.short_id)
def __eq__(self, other):
return isinstance(other, self.__class__) and self.id == other.id
@property
def id(self):
"""
The ID of the object.
"""
return self.attrs.get(self.id_attribute)
@property
def short_id(self):
"""
The ID of the object, truncated to 10 characters.
"""
return self.id[:10]
def reload(self):
"""
Load this object from the server again and update ``attrs`` with the
new data.
"""
new_model = self.collection.get(self.id)
self.attrs = new_model.attrs
class Collection(object):
"""
A base class for representing all objects of a particular type on the
server.
"""
#: The type of object this collection represents, set by subclasses
model = None
def __init__(self, client=None):
#: The client pointing at the server that this collection of objects
#: is on.
self.client = client
def list(self):
raise NotImplementedError
def get(self, key):
raise NotImplementedError
def create(self, attrs=None):
raise NotImplementedError
def prepare_model(self, attrs):
"""
Create a model from a set of attributes.
"""
if isinstance(attrs, Model):
attrs.client = self.client
attrs.collection = self
return attrs
elif isinstance(attrs, dict):
return self.model(attrs=attrs, client=self.client, collection=self)
else:
raise Exception("Can't create %s from %s" %
(self.model.__name__, attrs))
import copy
from docker.errors import create_unexpected_kwargs_error
from docker.types import TaskTemplate, ContainerSpec
from .resource import Model, Collection
class Service(Model):
"""A service."""
id_attribute = 'ID'
@property
def name(self):
"""The service's name."""
return self.attrs['Spec']['Name']
@property
def version(self):
"""
The version number of the service. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def remove(self):
"""
Stop and remove the service.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.client.api.remove_service(self.id)
def tasks(self, filters=None):
"""
List the tasks in this service.
Args:
filters (dict): A map of filters to process on the tasks list.
Valid filters: ``id``, ``name``, ``node``,
``label``, and ``desired-state``.
Returns:
(list): List of task dictionaries.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
if filters is None:
filters = {}
filters['service'] = self.id
return self.client.api.tasks(filters=filters)
def update(self, **kwargs):
"""
Update a service's configuration. Similar to the ``docker service
update`` command.
Takes the same parameters as :py:meth:`~ServiceCollection.create`.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
# Image is required, so if it hasn't been set, use current image
if 'image' not in kwargs:
spec = self.attrs['Spec']['TaskTemplate']['ContainerSpec']
kwargs['image'] = spec['Image']
create_kwargs = _get_create_service_kwargs('update', kwargs)
return self.client.api.update_service(
self.id,
self.version,
**create_kwargs
)
class ServiceCollection(Collection):
"""Services on the Docker server."""
model = Service
def create(self, image, command=None, **kwargs):
"""
Create a service. Similar to the ``docker service create`` command.
Args:
image (str): The image name to use for the containers.
command (list of str or str): Command to run.
args (list of str): Arguments to the command.
constraints (list of str): Placement constraints.
container_labels (dict): Labels to apply to the container.
endpoint_spec (dict): Properties that can be configured to
access and load balance a service. Default: ``None``.
env (list of str): Environment variables, in the form
``KEY=val``.
labels (dict): Labels to apply to the service.
log_driver (str): Log driver to use for containers.
log_driver_options (dict): Log driver options.
mode (string): Scheduling mode for the service (``replicated`` or
``global``). Defaults to ``replicated``.
mounts (list of str): Mounts for the containers, in the form
``source:target:options``, where options is either
``ro`` or ``rw``.
name (str): Name to give to the service.
networks (list): List of network names or IDs to attach the
service to. Default: ``None``.
resources (dict): Resource limits and reservations. For the
format, see the Remote API documentation.
restart_policy (dict): Restart policy for containers. For the
format, see the Remote API documentation.
stop_grace_period (int): Amount of time to wait for
containers to terminate before forcefully killing them.
update_config (dict): Specification for the update strategy of the
service. Default: ``None``
user (str): User to run commands as.
workdir (str): Working directory for commands to run.
Returns:
(:py:class:`Service`) The created service.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
kwargs['image'] = image
kwargs['command'] = command
create_kwargs = _get_create_service_kwargs('create', kwargs)
service_id = self.client.api.create_service(**create_kwargs)
return self.get(service_id)
def get(self, service_id):
"""
Get a service.
Args:
service_id (str): The ID of the service.
Returns:
(:py:class:`Service`): The service.
Raises:
:py:class:`docker.errors.NotFound`
If the service does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_service(service_id))
def list(self, **kwargs):
"""
List services.
Args:
filters (dict): Filters to process on the nodes list. Valid
filters: ``id`` and ``name``. Default: ``None``.
Returns:
(list of :py:class:`Service`): The services.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return [
self.prepare_model(s)
for s in self.client.api.services(**kwargs)
]
# kwargs to copy straight over to ContainerSpec
CONTAINER_SPEC_KWARGS = [
'image',
'command',
'args',
'env',
'workdir',
'user',
'labels',
'mounts',
'stop_grace_period',
]
# kwargs to copy straight over to TaskTemplate
TASK_TEMPLATE_KWARGS = [
'resources',
'restart_policy',
]
# kwargs to copy straight over to create_service
CREATE_SERVICE_KWARGS = [
'name',
'labels',
'mode',
'update_config',
'networks',
'endpoint_spec',
]
def _get_create_service_kwargs(func_name, kwargs):
# Copy over things which can be copied directly
create_kwargs = {}
for key in copy.copy(kwargs):
if key in CREATE_SERVICE_KWARGS:
create_kwargs[key] = kwargs.pop(key)
container_spec_kwargs = {}
for key in copy.copy(kwargs):
if key in CONTAINER_SPEC_KWARGS:
container_spec_kwargs[key] = kwargs.pop(key)
task_template_kwargs = {}
for key in copy.copy(kwargs):
if key in TASK_TEMPLATE_KWARGS:
task_template_kwargs[key] = kwargs.pop(key)
if 'container_labels' in kwargs:
container_spec_kwargs['labels'] = kwargs.pop('container_labels')
if 'constraints' in kwargs:
task_template_kwargs['placement'] = {
'Constraints': kwargs.pop('constraints')
}
if 'log_driver' in kwargs:
task_template_kwargs['log_driver'] = {
'Name': kwargs.pop('log_driver'),
'Options': kwargs.pop('log_driver_options', {})
}
# All kwargs should have been consumed by this point, so raise
# error if any are left
if kwargs:
raise create_unexpected_kwargs_error(func_name, kwargs)
container_spec = ContainerSpec(**container_spec_kwargs)
task_template_kwargs['container_spec'] = container_spec
create_kwargs['task_template'] = TaskTemplate(**task_template_kwargs)
return create_kwargs
from docker.api import APIClient
from docker.errors import APIError
from docker.types import SwarmSpec
from .resource import Model
class Swarm(Model):
"""
The server's Swarm state. This a singleton that must be reloaded to get
the current state of the Swarm.
"""
def __init__(self, *args, **kwargs):
super(Swarm, self).__init__(*args, **kwargs)
if self.client:
try:
self.reload()
except APIError as e:
if e.response.status_code != 406:
raise
@property
def version(self):
"""
The version number of the swarm. If this is not the same as the
server, the :py:meth:`update` function will not work and you will
need to call :py:meth:`reload` before calling it again.
"""
return self.attrs.get('Version').get('Index')
def init(self, advertise_addr=None, listen_addr='0.0.0.0:2377',
force_new_cluster=False, swarm_spec=None, **kwargs):
"""
Initialize a new swarm on this Engine.
Args:
advertise_addr (str): Externally reachable address advertised to
other nodes. This can either be an address/port combination in
the form ``192.168.1.1:4567``, or an interface followed by a
port number, like ``eth0:4567``. If the port number is omitted,
the port number from the listen address is used.
If not specified, it will be automatically detected when
possible.
listen_addr (str): Listen address used for inter-manager
communication, as well as determining the networking interface
used for the VXLAN Tunnel Endpoint (VTEP). This can either be
an address/port combination in the form ``192.168.1.1:4567``,
or an interface followed by a port number, like ``eth0:4567``.
If the port number is omitted, the default swarm listening port
is used. Default: ``0.0.0.0:2377``
force_new_cluster (bool): Force creating a new Swarm, even if
already part of one. Default: False
task_history_retention_limit (int): Maximum number of tasks
history stored.
snapshot_interval (int): Number of logs entries between snapshot.
keep_old_snapshots (int): Number of snapshots to keep beyond the
current snapshot.
log_entries_for_slow_followers (int): Number of log entries to
keep around to sync up slow followers after a snapshot is
created.
heartbeat_tick (int): Amount of ticks (in seconds) between each
heartbeat.
election_tick (int): Amount of ticks (in seconds) needed without a
leader to trigger a new election.
dispatcher_heartbeat_period (int): The delay for an agent to send
a heartbeat to the dispatcher.
node_cert_expiry (int): Automatic expiry for nodes certificates.
external_ca (dict): Configuration for forwarding signing requests
to an external certificate authority. Use
``docker.types.SwarmExternalCA``.
name (string): Swarm's name
Returns:
``True`` if the request went through.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> client.swarm.init(
advertise_addr='eth0', listen_addr='0.0.0.0:5000',
force_new_cluster=False, snapshot_interval=5000,
log_entries_for_slow_followers=1200
)
"""
init_kwargs = {}
for arg in ['advertise_addr', 'listen_addr', 'force_new_cluster']:
if arg in kwargs:
init_kwargs[arg] = kwargs[arg]
del kwargs[arg]
init_kwargs['swarm_spec'] = SwarmSpec(**kwargs)
self.client.api.init_swarm(**init_kwargs)
self.reload()
def join(self, *args, **kwargs):
return self.client.api.join_swarm(*args, **kwargs)
join.__doc__ = APIClient.join_swarm.__doc__
def leave(self, *args, **kwargs):
return self.client.api.leave_swarm(*args, **kwargs)
leave.__doc__ = APIClient.leave_swarm.__doc__
def reload(self):
"""
Inspect the swarm on the server and store the response in
:py:attr:`attrs`.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
self.attrs = self.client.api.inspect_swarm()
def update(self, rotate_worker_token=False, rotate_manager_token=False,
**kwargs):
"""
Update the swarm's configuration.
It takes the same arguments as :py:meth:`init`, except
``advertise_addr``, ``listen_addr``, and ``force_new_cluster``. In
addition, it takes these arguments:
Args:
rotate_worker_token (bool): Rotate the worker join token. Default:
``False``.
rotate_manager_token (bool): Rotate the manager join token.
Default: ``False``.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
# this seems to have to be set
if kwargs.get('node_cert_expiry') is None:
kwargs['node_cert_expiry'] = 7776000000000000
return self.client.api.update_swarm(
version=self.version,
swarm_spec=SwarmSpec(**kwargs),
rotate_worker_token=rotate_worker_token,
rotate_manager_token=rotate_manager_token
)
from .resource import Model, Collection
class Volume(Model):
"""A volume."""
id_attribute = 'Name'
@property
def name(self):
"""The name of the volume."""
return self.attrs['Name']
def remove(self):
"""Remove this volume."""
return self.client.api.remove_volume(self.id)
class VolumeCollection(Collection):
"""Volumes on the Docker server."""
model = Volume
def create(self, name, **kwargs):
"""
Create a volume.
Args:
name (str): Name of the volume
driver (str): Name of the driver used to create the volume
driver_opts (dict): Driver options as a key-value dictionary
labels (dict): Labels to set on the volume
Returns:
(:py:class:`Volume`): The volume created.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
>>> volume = client.volumes.create(name='foobar', driver='local',
driver_opts={'foo': 'bar', 'baz': 'false'},
labels={"key": "value"})
"""
obj = self.client.api.create_volume(name, **kwargs)
return self.prepare_model(obj)
def get(self, volume_id):
"""
Get a volume.
Args:
volume_id (str): Volume name.
Returns:
(:py:class:`Volume`): The volume.
Raises:
:py:class:`docker.errors.NotFound`
If the volume does not exist.
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
return self.prepare_model(self.client.api.inspect_volume(volume_id))
def list(self, **kwargs):
"""
List volumes. Similar to the ``docker volume ls`` command.
Args:
filters (dict): Server-side list filtering options.
Returns:
(list of :py:class:`Volume`): The volumes.
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
"""
resp = self.client.api.volumes(**kwargs)
if not resp.get('Volumes'):
return []
return [self.prepare_model(obj) for obj in resp['Volumes']]
from __future__ import absolute_import
from __future__ import unicode_literals
import json
import json.decoder
import six
from ..errors import StreamParseError
json_decoder = json.JSONDecoder()
def stream_as_text(stream):
"""Given a stream of bytes or text, if any of the items in the stream
are bytes convert them to text.
This function can be removed once docker-py returns text streams instead
of byte streams.
"""
for data in stream:
if not isinstance(data, six.text_type):
data = data.decode('utf-8', 'replace')
yield data
def json_splitter(buffer):
"""Attempt to parse a json object from a buffer. If there is at least one
object, return it and the rest of the buffer, otherwise return None.
"""
buffer = buffer.strip()
try:
obj, index = json_decoder.raw_decode(buffer)
rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
return obj, rest
except ValueError:
return None
def json_stream(stream):
"""Given a stream of text, return a stream of json objects.
This handles streams which are inconsistently buffered (some entries may
be newline delimited, and others are not).
"""
return split_buffer(stream, json_splitter, json_decoder.decode)
def line_splitter(buffer, separator=u'\n'):
index = buffer.find(six.text_type(separator))
if index == -1:
return None
return buffer[:index + 1], buffer[index + 1:]
def split_buffer(stream, splitter=None, decoder=lambda a: a):
"""Given a generator which yields strings and a splitter function,
joins all input, splits on the separator and yields each chunk.
Unlike string.split(), each chunk includes the trailing
separator, except for the last one if none was found on the end
of the input.
"""
splitter = splitter or line_splitter
buffered = six.text_type('')
for data in stream_as_text(stream):
buffered += data
while True:
buffer_split = splitter(buffered)
if buffer_split is None:
break
item, buffered = buffer_split
yield item
if buffered:
try:
yield decoder(buffered)
except Exception as e:
raise StreamParseError(e)
......@@ -61,3 +61,16 @@ def wait_on_condition(condition, delay=0.1, timeout=40):
def random_name():
return u'dockerpytest_{0:x}'.format(random.getrandbits(64))
def force_leave_swarm(client):
"""Actually force leave a Swarm. There seems to be a bug in Swarm that
occasionally throws "context deadline exceeded" errors when leaving."""
while True:
try:
return client.swarm.leave(force=True)
except docker.errors.APIError as e:
if e.explanation == "context deadline exceeded":
continue
else:
raise
import unittest
import docker
class ClientTest(unittest.TestCase):
def test_info(self):
client = docker.from_env()
info = client.info()
assert 'ID' in info
assert 'Name' in info
def test_ping(self):
client = docker.from_env()
assert client.ping() is True
def test_version(self):
client = docker.from_env()
assert 'Version' in client.version()
import docker
from .base import BaseIntegrationTest
class ContainerCollectionTest(BaseIntegrationTest):
def test_run(self):
client = docker.from_env()
self.assertEqual(
client.containers.run("alpine", "echo hello world", remove=True),
b'hello world\n'
)
def test_run_detach(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
assert container.attrs['Config']['Image'] == "alpine"
assert container.attrs['Config']['Cmd'] == ['sleep', '300']
def test_run_with_error(self):
client = docker.from_env()
with self.assertRaises(docker.errors.ContainerError) as cm:
client.containers.run("alpine", "cat /test", remove=True)
assert cm.exception.exit_status == 1
assert "cat /test" in str(cm.exception)
assert "alpine" in str(cm.exception)
assert "No such file or directory" in str(cm.exception)
def test_run_with_image_that_does_not_exist(self):
client = docker.from_env()
with self.assertRaises(docker.errors.ImageNotFound):
client.containers.run("dockerpytest_does_not_exist")
def test_get(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
assert client.containers.get(container.id).attrs[
'Config']['Image'] == "alpine"
def test_list(self):
client = docker.from_env()
container_id = client.containers.run(
"alpine", "sleep 300", detach=True).id
self.tmp_containers.append(container_id)
containers = [c for c in client.containers.list() if c.id ==
container_id]
assert len(containers) == 1
container = containers[0]
assert container.attrs['Config']['Image'] == 'alpine'
container.kill()
container.remove()
assert container_id not in [c.id for c in client.containers.list()]
class ContainerTest(BaseIntegrationTest):
def test_commit(self):
client = docker.from_env()
container = client.containers.run(
"alpine", "sh -c 'echo \"hello\" > /test'",
detach=True
)
self.tmp_containers.append(container.id)
container.wait()
image = container.commit()
self.assertEqual(
client.containers.run(image.id, "cat /test", remove=True),
b"hello\n"
)
def test_diff(self):
client = docker.from_env()
container = client.containers.run("alpine", "touch /test", detach=True)
self.tmp_containers.append(container.id)
container.wait()
assert container.diff() == [{'Path': '/test', 'Kind': 1}]
def test_exec_run(self):
client = docker.from_env()
container = client.containers.run(
"alpine", "sh -c 'echo \"hello\" > /test; sleep 60'", detach=True
)
self.tmp_containers.append(container.id)
assert container.exec_run("cat /test") == b"hello\n"
def test_kill(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
while container.status != 'running':
container.reload()
assert container.status == 'running'
container.kill()
container.reload()
assert container.status == 'exited'
def test_logs(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello world",
detach=True)
self.tmp_containers.append(container.id)
container.wait()
assert container.logs() == b"hello world\n"
def test_pause(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
container.pause()
container.reload()
assert container.status == "paused"
container.unpause()
container.reload()
assert container.status == "running"
def test_remove(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello", detach=True)
self.tmp_containers.append(container.id)
assert container.id in [c.id for c in client.containers.list(all=True)]
container.wait()
container.remove()
containers = client.containers.list(all=True)
assert container.id not in [c.id for c in containers]
def test_rename(self):
client = docker.from_env()
container = client.containers.run("alpine", "echo hello", name="test1",
detach=True)
self.tmp_containers.append(container.id)
assert container.name == "test1"
container.rename("test2")
container.reload()
assert container.name == "test2"
def test_restart(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 100", detach=True)
self.tmp_containers.append(container.id)
first_started_at = container.attrs['State']['StartedAt']
container.restart()
container.reload()
second_started_at = container.attrs['State']['StartedAt']
assert first_started_at != second_started_at
def test_start(self):
client = docker.from_env()
container = client.containers.create("alpine", "sleep 50", detach=True)
self.tmp_containers.append(container.id)
assert container.status == "created"
container.start()
container.reload()
assert container.status == "running"
def test_stats(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 100", detach=True)
self.tmp_containers.append(container.id)
stats = container.stats(stream=False)
for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
'memory_stats', 'blkio_stats']:
assert key in stats
def test_stop(self):
client = docker.from_env()
container = client.containers.run("alpine", "top", detach=True)
self.tmp_containers.append(container.id)
assert container.status in ("running", "created")
container.stop(timeout=2)
container.reload()
assert container.status == "exited"
def test_top(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 60", detach=True)
self.tmp_containers.append(container.id)
top = container.top()
assert len(top['Processes']) == 1
assert 'sleep 60' in top['Processes'][0]
def test_update(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 60", detach=True,
cpu_shares=2)
self.tmp_containers.append(container.id)
assert container.attrs['HostConfig']['CpuShares'] == 2
container.update(cpu_shares=3)
container.reload()
assert container.attrs['HostConfig']['CpuShares'] == 3
def test_wait(self):
client = docker.from_env()
container = client.containers.run("alpine", "sh -c 'exit 0'",
detach=True)
self.tmp_containers.append(container.id)
assert container.wait() == 0
container = client.containers.run("alpine", "sh -c 'exit 1'",
detach=True)
self.tmp_containers.append(container.id)
assert container.wait() == 1
import io
import docker
from .base import BaseIntegrationTest
class ImageCollectionTest(BaseIntegrationTest):
def test_build(self):
client = docker.from_env()
image = client.images.build(fileobj=io.BytesIO(
"FROM alpine\n"
"CMD echo hello world".encode('ascii')
))
self.tmp_imgs.append(image.id)
assert client.containers.run(image) == b"hello world\n"
def test_build_with_error(self):
client = docker.from_env()
with self.assertRaises(docker.errors.BuildError) as cm:
client.images.build(fileobj=io.BytesIO(
"FROM alpine\n"
"NOTADOCKERFILECOMMAND".encode('ascii')
))
assert str(cm.exception) == ("Unknown instruction: "
"NOTADOCKERFILECOMMAND")
def test_list(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert image.id in get_ids(client.images.list())
def test_list_with_repository(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert image.id in get_ids(client.images.list('alpine'))
assert image.id in get_ids(client.images.list('alpine:latest'))
def test_pull(self):
client = docker.from_env()
image = client.images.pull('alpine:latest')
assert 'alpine:latest' in image.attrs['RepoTags']
class ImageTest(BaseIntegrationTest):
def test_tag_and_remove(self):
repo = 'dockersdk.tests.images.test_tag'
tag = 'some-tag'
identifier = '{}:{}'.format(repo, tag)
client = docker.from_env()
image = client.images.pull('alpine:latest')
image.tag(repo, tag)
self.tmp_imgs.append(identifier)
assert image.id in get_ids(client.images.list(repo))
assert image.id in get_ids(client.images.list(identifier))
client.images.remove(identifier)
assert image.id not in get_ids(client.images.list(repo))
assert image.id not in get_ids(client.images.list(identifier))
assert image.id in get_ids(client.images.list('alpine:latest'))
def get_ids(images):
return [i.id for i in images]
import docker
from .. import helpers
from .base import BaseIntegrationTest
class ImageCollectionTest(BaseIntegrationTest):
def test_create(self):
client = docker.from_env()
name = helpers.random_name()
network = client.networks.create(name, labels={'foo': 'bar'})
self.tmp_networks.append(network.id)
assert network.name == name
assert network.attrs['Labels']['foo'] == "bar"
def test_get(self):
client = docker.from_env()
name = helpers.random_name()
network_id = client.networks.create(name).id
self.tmp_networks.append(network_id)
network = client.networks.get(network_id)
assert network.name == name
def test_list_remove(self):
client = docker.from_env()
name = helpers.random_name()
network = client.networks.create(name)
self.tmp_networks.append(network.id)
assert network.id in [n.id for n in client.networks.list()]
assert network.id not in [
n.id for n in
client.networks.list(ids=["fdhjklfdfdshjkfds"])
]
assert network.id in [
n.id for n in
client.networks.list(ids=[network.id])
]
assert network.id not in [
n.id for n in
client.networks.list(names=["fdshjklfdsjhkl"])
]
assert network.id in [
n.id for n in
client.networks.list(names=[name])
]
network.remove()
assert network.id not in [n.id for n in client.networks.list()]
class ImageTest(BaseIntegrationTest):
def test_connect_disconnect(self):
client = docker.from_env()
network = client.networks.create(helpers.random_name())
self.tmp_networks.append(network.id)
container = client.containers.create("alpine", "sleep 300")
self.tmp_containers.append(container.id)
assert network.containers == []
network.connect(container)
container.start()
assert client.networks.get(network.id).containers == [container]
network.disconnect(container)
assert network.containers == []
assert client.networks.get(network.id).containers == []
import unittest
import docker
from .. import helpers
class NodesTest(unittest.TestCase):
def setUp(self):
helpers.force_leave_swarm(docker.from_env())
def tearDown(self):
helpers.force_leave_swarm(docker.from_env())
def test_list_get_update(self):
client = docker.from_env()
client.swarm.init()
nodes = client.nodes.list()
assert len(nodes) == 1
assert nodes[0].attrs['Spec']['Role'] == 'manager'
node = client.nodes.get(nodes[0].id)
assert node.id == nodes[0].id
assert node.attrs['Spec']['Role'] == 'manager'
assert node.version > 0
node = client.nodes.list()[0]
assert not node.attrs['Spec'].get('Labels')
node.update({
'Availability': 'active',
'Name': 'node-name',
'Role': 'manager',
'Labels': {'foo': 'bar'}
})
node.reload()
assert node.attrs['Spec']['Labels'] == {'foo': 'bar'}
import docker
from .base import BaseIntegrationTest
class ModelTest(BaseIntegrationTest):
def test_reload(self):
client = docker.from_env()
container = client.containers.run("alpine", "sleep 300", detach=True)
self.tmp_containers.append(container.id)
first_started_at = container.attrs['State']['StartedAt']
container.kill()
container.start()
assert container.attrs['State']['StartedAt'] == first_started_at
container.reload()
assert container.attrs['State']['StartedAt'] != first_started_at
import unittest
import docker
from .. import helpers
class ServiceTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
client = docker.from_env()
helpers.force_leave_swarm(client)
client.swarm.init()
@classmethod
def tearDownClass(cls):
helpers.force_leave_swarm(docker.from_env())
def test_create(self):
client = docker.from_env()
name = helpers.random_name()
service = client.services.create(
# create arguments
name=name,
labels={'foo': 'bar'},
# ContainerSpec arguments
image="alpine",
command="sleep 300",
container_labels={'container': 'label'}
)
assert service.name == name
assert service.attrs['Spec']['Labels']['foo'] == 'bar'
container_spec = service.attrs['Spec']['TaskTemplate']['ContainerSpec']
assert container_spec['Image'] == "alpine"
assert container_spec['Labels'] == {'container': 'label'}
def test_get(self):
client = docker.from_env()
name = helpers.random_name()
service = client.services.create(
name=name,
image="alpine",
command="sleep 300"
)
service = client.services.get(service.id)
assert service.name == name
def test_list_remove(self):
client = docker.from_env()
service = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
assert service in client.services.list()
service.remove()
assert service not in client.services.list()
def test_tasks(self):
client = docker.from_env()
service1 = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
service2 = client.services.create(
name=helpers.random_name(),
image="alpine",
command="sleep 300"
)
tasks = []
while len(tasks) == 0:
tasks = service1.tasks()
assert len(tasks) == 1
assert tasks[0]['ServiceID'] == service1.id
tasks = []
while len(tasks) == 0:
tasks = service2.tasks()
assert len(tasks) == 1
assert tasks[0]['ServiceID'] == service2.id
def test_update(self):
client = docker.from_env()
service = client.services.create(
# create arguments
name=helpers.random_name(),
# ContainerSpec arguments
image="alpine",
command="sleep 300"
)
new_name = helpers.random_name()
service.update(
# create argument
name=new_name,
# ContainerSpec argument
command="sleep 600"
)
service.reload()
assert service.name == new_name
container_spec = service.attrs['Spec']['TaskTemplate']['ContainerSpec']
assert container_spec['Command'] == ["sleep", "600"]
import unittest
import docker
from .. import helpers
class SwarmTest(unittest.TestCase):
def setUp(self):
helpers.force_leave_swarm(docker.from_env())
def tearDown(self):
helpers.force_leave_swarm(docker.from_env())
def test_init_update_leave(self):
client = docker.from_env()
client.swarm.init(snapshot_interval=5000)
assert client.swarm.attrs['Spec']['Raft']['SnapshotInterval'] == 5000
client.swarm.update(snapshot_interval=10000)
assert client.swarm.attrs['Spec']['Raft']['SnapshotInterval'] == 10000
assert client.swarm.leave(force=True)
with self.assertRaises(docker.errors.APIError) as cm:
client.swarm.reload()
assert cm.exception.response.status_code == 406
import docker
from .base import BaseIntegrationTest
class VolumesTest(BaseIntegrationTest):
def test_create_get(self):
client = docker.from_env()
volume = client.volumes.create(
'dockerpytest_1',
driver='local',
labels={'labelkey': 'labelvalue'}
)
self.tmp_volumes.append(volume.id)
assert volume.id
assert volume.name == 'dockerpytest_1'
assert volume.attrs['Labels'] == {'labelkey': 'labelvalue'}
volume = client.volumes.get(volume.id)
assert volume.name == 'dockerpytest_1'
def test_list_remove(self):
client = docker.from_env()
volume = client.volumes.create('dockerpytest_1')
self.tmp_volumes.append(volume.id)
assert volume in client.volumes.list()
assert volume in client.volumes.list(filters={'name': 'dockerpytest_'})
assert volume not in client.volumes.list(filters={'name': 'foobar'})
volume.remove()
assert volume not in client.volumes.list()
......@@ -27,7 +27,6 @@ except ImportError:
DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
TEST_CERT_DIR = os.path.join(os.path.dirname(__file__), 'testdata/certs')
def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
......@@ -487,32 +486,6 @@ class UserAgentTest(unittest.TestCase):
self.assertEqual(headers['User-Agent'], 'foo/bar')
class FromEnvTest(unittest.TestCase):
def setUp(self):
self.os_environ = os.environ.copy()
def tearDown(self):
os.environ = self.os_environ
def test_from_env(self):
"""Test that environment variables are passed through to
utils.kwargs_from_env(). KwargsFromEnvTest tests that environment
variables are parsed correctly."""
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = APIClient.from_env()
self.assertEqual(client.base_url, "https://192.168.59.103:2376")
def test_from_env_with_version(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = APIClient.from_env(version='2.32')
self.assertEqual(client.base_url, "https://192.168.59.103:2376")
self.assertEqual(client._version, '2.32')
class DisableSocketTest(unittest.TestCase):
class DummySocket(object):
def __init__(self, timeout=60):
......
import datetime
import docker
import os
import unittest
from . import fake_api
try:
from unittest import mock
except ImportError:
import mock
TEST_CERT_DIR = os.path.join(os.path.dirname(__file__), 'testdata/certs')
class ClientTest(unittest.TestCase):
@mock.patch('docker.api.APIClient.events')
def test_events(self, mock_func):
since = datetime.datetime(2016, 1, 1, 0, 0)
mock_func.return_value = fake_api.get_fake_events()[1]
client = docker.from_env()
assert client.events(since=since) == mock_func.return_value
mock_func.assert_called_with(since=since)
@mock.patch('docker.api.APIClient.info')
def test_info(self, mock_func):
mock_func.return_value = fake_api.get_fake_info()[1]
client = docker.from_env()
assert client.info() == mock_func.return_value
mock_func.assert_called_with()
@mock.patch('docker.api.APIClient.ping')
def test_ping(self, mock_func):
mock_func.return_value = True
client = docker.from_env()
assert client.ping() is True
mock_func.assert_called_with()
@mock.patch('docker.api.APIClient.version')
def test_version(self, mock_func):
mock_func.return_value = fake_api.get_fake_version()[1]
client = docker.from_env()
assert client.version() == mock_func.return_value
mock_func.assert_called_with()
class FromEnvTest(unittest.TestCase):
def setUp(self):
self.os_environ = os.environ.copy()
def tearDown(self):
os.environ = self.os_environ
def test_from_env(self):
"""Test that environment variables are passed through to
utils.kwargs_from_env(). KwargsFromEnvTest tests that environment
variables are parsed correctly."""
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = docker.from_env()
self.assertEqual(client.api.base_url, "https://192.168.59.103:2376")
def test_from_env_with_version(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
client = docker.from_env(version='2.32')
self.assertEqual(client.api.base_url, "https://192.168.59.103:2376")
self.assertEqual(client.api._version, '2.32')
import unittest
from docker.errors import (APIError, DockerException,
create_unexpected_kwargs_error)
class APIErrorTest(unittest.TestCase):
def test_api_error_is_caught_by_dockerexception(self):
try:
raise APIError("this should be caught by DockerException")
except DockerException:
pass
class CreateUnexpectedKwargsErrorTest(unittest.TestCase):
def test_create_unexpected_kwargs_error_single(self):
e = create_unexpected_kwargs_error('f', {'foo': 'bar'})
assert str(e) == "f() got an unexpected keyword argument 'foo'"
def test_create_unexpected_kwargs_error_multiple(self):
e = create_unexpected_kwargs_error('f', {'foo': 'bar', 'baz': 'bosh'})
assert str(e) == "f() got unexpected keyword arguments 'baz', 'foo'"
......@@ -6,6 +6,7 @@ CURRENT_VERSION = 'v{0}'.format(constants.DEFAULT_DOCKER_API_VERSION)
FAKE_CONTAINER_ID = '3cc2351ab11b'
FAKE_IMAGE_ID = 'e9aa60c60128'
FAKE_EXEC_ID = 'd5d177f121dc'
FAKE_NETWORK_ID = '33fb6a3462b8'
FAKE_IMAGE_NAME = 'test_image'
FAKE_TARBALL_PATH = '/path/to/tarball'
FAKE_REPO_NAME = 'repo'
......@@ -46,6 +47,17 @@ def get_fake_info():
return status_code, response
def post_fake_auth():
status_code = 200
response = {'Status': 'Login Succeeded',
'IdentityToken': '9cbaf023786cd7'}
return status_code, response
def get_fake_ping():
return 200, "OK"
def get_fake_search():
status_code = 200
response = [{'Name': 'busybox', 'Description': 'Fake Description'}]
......@@ -125,7 +137,9 @@ def get_fake_inspect_container(tty=False):
'Config': {'Privileged': True, 'Tty': tty},
'ID': FAKE_CONTAINER_ID,
'Image': 'busybox:latest',
'Name': 'foobar',
"State": {
"Status": "running",
"Running": True,
"Pid": 0,
"ExitCode": 0,
......@@ -140,11 +154,11 @@ def get_fake_inspect_container(tty=False):
def get_fake_inspect_image():
status_code = 200
response = {
'id': FAKE_IMAGE_ID,
'parent': "27cf784147099545",
'created': "2013-03-23T22:24:18.818426-07:00",
'container': FAKE_CONTAINER_ID,
'container_config':
'Id': FAKE_IMAGE_ID,
'Parent': "27cf784147099545",
'Created': "2013-03-23T22:24:18.818426-07:00",
'Container': FAKE_CONTAINER_ID,
'ContainerConfig':
{
"Hostname": "",
"User": "",
......@@ -411,6 +425,61 @@ def post_fake_update_node():
return 200, None
def get_fake_network_list():
return 200, [{
"Name": "bridge",
"Id": FAKE_NETWORK_ID,
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": False,
"Internal": False,
"IPAM": {
"Driver": "default",
"Config": [
{
"Subnet": "172.17.0.0/16"
}
]
},
"Containers": {
FAKE_CONTAINER_ID: {
"EndpointID": "ed2419a97c1d99",
"MacAddress": "02:42:ac:11:00:02",
"IPv4Address": "172.17.0.2/16",
"IPv6Address": ""
}
},
"Options": {
"com.docker.network.bridge.default_bridge": "true",
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true",
"com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
"com.docker.network.bridge.name": "docker0",
"com.docker.network.driver.mtu": "1500"
}
}]
def get_fake_network():
return 200, get_fake_network_list()[1][0]
def post_fake_network():
return 201, {"Id": FAKE_NETWORK_ID, "Warnings": []}
def delete_fake_network():
return 204, None
def post_fake_network_connect():
return 200, None
def post_fake_network_disconnect():
return 200, None
# Maps real api url to fake response callback
prefix = 'http+docker://localunixsocket'
if constants.IS_WINDOWS_PLATFORM:
......@@ -423,6 +492,10 @@ fake_responses = {
get_fake_version,
'{1}/{0}/info'.format(CURRENT_VERSION, prefix):
get_fake_info,
'{1}/{0}/auth'.format(CURRENT_VERSION, prefix):
post_fake_auth,
'{1}/{0}/_ping'.format(CURRENT_VERSION, prefix):
get_fake_ping,
'{1}/{0}/images/search'.format(CURRENT_VERSION, prefix):
get_fake_search,
'{1}/{0}/images/json'.format(CURRENT_VERSION, prefix):
......@@ -516,4 +589,24 @@ fake_responses = {
CURRENT_VERSION, prefix, FAKE_NODE_ID
), 'POST'):
post_fake_update_node,
('{1}/{0}/networks'.format(CURRENT_VERSION, prefix), 'GET'):
get_fake_network_list,
('{1}/{0}/networks/create'.format(CURRENT_VERSION, prefix), 'POST'):
post_fake_network,
('{1}/{0}/networks/{2}'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'GET'):
get_fake_network,
('{1}/{0}/networks/{2}'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'DELETE'):
delete_fake_network,
('{1}/{0}/networks/{2}/connect'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'POST'):
post_fake_network_connect,
('{1}/{0}/networks/{2}/disconnect'.format(
CURRENT_VERSION, prefix, FAKE_NETWORK_ID
), 'POST'):
post_fake_network_disconnect,
}
import copy
import docker
from . import fake_api
try:
from unittest import mock
except ImportError:
import mock
class CopyReturnMagicMock(mock.MagicMock):
"""
A MagicMock which deep copies every return value.
"""
def _mock_call(self, *args, **kwargs):
ret = super(CopyReturnMagicMock, self)._mock_call(*args, **kwargs)
if isinstance(ret, (dict, list)):
ret = copy.deepcopy(ret)
return ret
def make_fake_api_client():
"""
Returns non-complete fake APIClient.
This returns most of the default cases correctly, but most arguments that
change behaviour will not work.
"""
api_client = docker.APIClient()
mock_client = CopyReturnMagicMock(**{
'build.return_value': fake_api.FAKE_IMAGE_ID,
'commit.return_value': fake_api.post_fake_commit()[1],
'containers.return_value': fake_api.get_fake_containers()[1],
'create_container.return_value':
fake_api.post_fake_create_container()[1],
'create_host_config.side_effect': api_client.create_host_config,
'create_network.return_value': fake_api.post_fake_network()[1],
'exec_create.return_value': fake_api.post_fake_exec_create()[1],
'exec_start.return_value': fake_api.post_fake_exec_start()[1],
'images.return_value': fake_api.get_fake_images()[1],
'inspect_container.return_value':
fake_api.get_fake_inspect_container()[1],
'inspect_image.return_value': fake_api.get_fake_inspect_image()[1],
'inspect_network.return_value': fake_api.get_fake_network()[1],
'logs.return_value': 'hello world\n',
'networks.return_value': fake_api.get_fake_network_list()[1],
'start.return_value': None,
'wait.return_value': 0,
})
mock_client._version = docker.constants.DEFAULT_DOCKER_API_VERSION
return mock_client
def make_fake_client():
"""
Returns a Client with a fake APIClient.
"""
client = docker.Client()
client.api = make_fake_api_client()
return client
This diff is collapsed.
from docker.models.images import Image
import unittest
from .fake_api import FAKE_IMAGE_ID
from .fake_api_client import make_fake_client
class ImageCollectionTest(unittest.TestCase):
def test_build(self):
client = make_fake_client()
image = client.images.build()
client.api.build.assert_called_with()
client.api.inspect_image.assert_called_with(FAKE_IMAGE_ID)
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_get(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
client.api.inspect_image.assert_called_with(FAKE_IMAGE_ID)
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_list(self):
client = make_fake_client()
images = client.images.list(all=True)
client.api.images.assert_called_with(all=True, name=None, filters=None)
assert len(images) == 1
assert isinstance(images[0], Image)
assert images[0].id == FAKE_IMAGE_ID
def test_load(self):
client = make_fake_client()
client.images.load('byte stream')
client.api.load_image.assert_called_with('byte stream')
def test_pull(self):
client = make_fake_client()
image = client.images.pull('test_image')
client.api.pull.assert_called_with('test_image')
client.api.inspect_image.assert_called_with('test_image')
assert isinstance(image, Image)
assert image.id == FAKE_IMAGE_ID
def test_push(self):
client = make_fake_client()
client.images.push('foobar', insecure_registry=True)
client.api.push.assert_called_with(
'foobar',
tag=None,
insecure_registry=True
)
def test_remove(self):
client = make_fake_client()
client.images.remove('test_image')
client.api.remove_image.assert_called_with('test_image')
def test_search(self):
client = make_fake_client()
client.images.search('test')
client.api.search.assert_called_with('test')
class ImageTest(unittest.TestCase):
def test_short_id(self):
image = Image(attrs={'Id': 'sha256:b6846070672ce4e8f1f91564ea6782bd675'
'f69d65a6f73ef6262057ad0a15dcd'})
assert image.short_id == 'sha256:b684607067'
image = Image(attrs={'Id': 'b6846070672ce4e8f1f91564ea6782bd675'
'f69d65a6f73ef6262057ad0a15dcd'})
assert image.short_id == 'b684607067'
def test_tags(self):
image = Image(attrs={
'RepoTags': ['test_image:latest']
})
assert image.tags == ['test_image:latest']
image = Image(attrs={
'RepoTags': ['<none>:<none>']
})
assert image.tags == []
def test_history(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.history()
client.api.history.assert_called_with(FAKE_IMAGE_ID)
def test_save(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.save()
client.api.get_image.assert_called_with(FAKE_IMAGE_ID)
def test_tag(self):
client = make_fake_client()
image = client.images.get(FAKE_IMAGE_ID)
image.tag('foo')
client.api.tag.assert_called_with(FAKE_IMAGE_ID, 'foo', tag=None)
import unittest
from .fake_api import FAKE_NETWORK_ID, FAKE_CONTAINER_ID
from .fake_api_client import make_fake_client
class ImageCollectionTest(unittest.TestCase):
def test_create(self):
client = make_fake_client()
network = client.networks.create("foobar", labels={'foo': 'bar'})
assert network.id == FAKE_NETWORK_ID
assert client.api.inspect_network.called_once_with(FAKE_NETWORK_ID)
assert client.api.create_network.called_once_with(
"foobar",
labels={'foo': 'bar'}
)
def test_get(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
assert network.id == FAKE_NETWORK_ID
assert client.api.inspect_network.called_once_with(FAKE_NETWORK_ID)
def test_list(self):
client = make_fake_client()
networks = client.networks.list()
assert networks[0].id == FAKE_NETWORK_ID
assert client.api.networks.called_once_with()
client = make_fake_client()
client.networks.list(ids=["abc"])
assert client.api.networks.called_once_with(ids=["abc"])
client = make_fake_client()
client.networks.list(names=["foobar"])
assert client.api.networks.called_once_with(names=["foobar"])
class ImageTest(unittest.TestCase):
def test_connect(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.connect(FAKE_CONTAINER_ID)
assert client.api.connect_container_to_network.called_once_with(
FAKE_CONTAINER_ID,
FAKE_NETWORK_ID
)
def test_disconnect(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.disconnect(FAKE_CONTAINER_ID)
assert client.api.disconnect_container_from_network.called_once_with(
FAKE_CONTAINER_ID,
FAKE_NETWORK_ID
)
def test_remove(self):
client = make_fake_client()
network = client.networks.get(FAKE_NETWORK_ID)
network.remove()
assert client.api.remove_network.called_once_with(FAKE_NETWORK_ID)
import unittest
from .fake_api import FAKE_CONTAINER_ID
from .fake_api_client import make_fake_client
class ModelTest(unittest.TestCase):
def test_reload(self):
client = make_fake_client()
container = client.containers.get(FAKE_CONTAINER_ID)
container.attrs['Name'] = "oldname"
container.reload()
assert client.api.inspect_container.call_count == 2
assert container.attrs['Name'] == "foobar"
import unittest
from docker.models.services import _get_create_service_kwargs
class CreateServiceKwargsTest(unittest.TestCase):
def test_get_create_service_kwargs(self):
kwargs = _get_create_service_kwargs('test', {
'image': 'foo',
'command': 'true',
'name': 'somename',
'labels': {'key': 'value'},
'mode': 'global',
'update_config': {'update': 'config'},
'networks': ['somenet'],
'endpoint_spec': {'blah': 'blah'},
'container_labels': {'containerkey': 'containervalue'},
'resources': {'foo': 'bar'},
'restart_policy': {'restart': 'policy'},
'log_driver': 'logdriver',
'log_driver_options': {'foo': 'bar'},
'args': ['some', 'args'],
'env': {'FOO': 'bar'},
'workdir': '/',
'user': 'bob',
'mounts': [{'some': 'mounts'}],
'stop_grace_period': 5,
'constraints': ['foo=bar'],
})
task_template = kwargs.pop('task_template')
assert kwargs == {
'name': 'somename',
'labels': {'key': 'value'},
'mode': 'global',
'update_config': {'update': 'config'},
'networks': ['somenet'],
'endpoint_spec': {'blah': 'blah'},
}
assert set(task_template.keys()) == set([
'ContainerSpec', 'Resources', 'RestartPolicy', 'Placement',
'LogDriver'
])
assert task_template['Placement'] == {'Constraints': ['foo=bar']}
assert task_template['LogDriver'] == {
'Name': 'logdriver',
'Options': {'foo': 'bar'}
}
assert set(task_template['ContainerSpec'].keys()) == set([
'Image', 'Command', 'Args', 'Env', 'Dir', 'User', 'Labels',
'Mounts', 'StopGracePeriod'
])
# encoding: utf-8
from __future__ import absolute_import
from __future__ import unicode_literals
from docker.utils.json_stream import json_splitter, stream_as_text, json_stream
class TestJsonSplitter(object):
def test_json_splitter_no_object(self):
data = '{"foo": "bar'
assert json_splitter(data) is None
def test_json_splitter_with_object(self):
data = '{"foo": "bar"}\n \n{"next": "obj"}'
assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
def test_json_splitter_leading_whitespace(self):
data = '\n \r{"foo": "bar"}\n\n {"next": "obj"}'
assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
class TestStreamAsText(object):
def test_stream_with_non_utf_unicode_character(self):
stream = [b'\xed\xf3\xf3']
output, = stream_as_text(stream)
assert output == '���'
def test_stream_with_utf_character(self):
stream = ['ěĝ'.encode('utf-8')]
output, = stream_as_text(stream)
assert output == 'ěĝ'
class TestJsonStream(object):
def test_with_falsy_entries(self):
stream = [
'{"one": "two"}\n{}\n',
"[1, 2, 3]\n[]\n",
]
output = list(json_stream(stream))
assert output == [
{'one': 'two'},
{},
[1, 2, 3],
[],
]
def test_with_leading_whitespace(self):
stream = [
'\n \r\n {"one": "two"}{"x": 1}',
' {"three": "four"}\t\t{"x": 2}'
]
output = list(json_stream(stream))
assert output == [
{'one': 'two'},
{'x': 1},
{'three': 'four'},
{'x': 2}
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment