pipe_connection.c 3.16 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * A type which wraps a pipe handle in message oriented mode
 *
 * pipe_connection.c
 *
 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
 */

#include "multiprocessing.h"

#define CLOSE(h) CloseHandle(h)

/*
 * Send string to the pipe; assumes in message oriented mode
 */

static Py_ssize_t
conn_send_string(ConnectionObject *conn, char *string, size_t length)
{
	DWORD amount_written;
21
	BOOL ret;
22

23 24 25
	Py_BEGIN_ALLOW_THREADS
	ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
	Py_END_ALLOW_THREADS
26 27 28 29 30 31

	if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
		PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
		return MP_STANDARD_ERROR;
	}

32
	return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
33 34 35 36 37 38 39 40 41 42 43 44 45
}

/*
 * Attempts to read into buffer, or if buffer too small into *newbuffer.
 *
 * Returns number of bytes read.  Assumes in message oriented mode.
 */

static Py_ssize_t
conn_recv_string(ConnectionObject *conn, char *buffer, 
		 size_t buflength, char **newbuffer, size_t maxlength)
{
	DWORD left, length, full_length, err;
46
	BOOL ret;
47 48
	*newbuffer = NULL;

49 50 51 52 53
	Py_BEGIN_ALLOW_THREADS
	ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength), 
		      &length, NULL);
	Py_END_ALLOW_THREADS
	if (ret)
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
		return length;

	err = GetLastError();
	if (err != ERROR_MORE_DATA) {
		if (err == ERROR_BROKEN_PIPE)
			return MP_END_OF_FILE;
		return MP_STANDARD_ERROR;
	}

	if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
		return MP_STANDARD_ERROR;

	full_length = length + left;
	if (full_length > maxlength)
		return MP_BAD_MESSAGE_LENGTH;

	*newbuffer = PyMem_Malloc(full_length);
	if (*newbuffer == NULL)
		return MP_MEMORY_ERROR;

	memcpy(*newbuffer, buffer, length);

76
	Py_BEGIN_ALLOW_THREADS
77
	ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
78 79
	Py_END_ALLOW_THREADS
	if (ret) {
80 81 82 83 84 85 86 87 88 89 90 91 92
		assert(length == left);
		return full_length;
	} else {
		PyMem_Free(*newbuffer);
		return MP_STANDARD_ERROR;
	}
}

/*
 * Check whether any data is available for reading
 */

static int
93
conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
{
	DWORD bytes, deadline, delay;
	int difference, res;
	BOOL block = FALSE;

	if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
		return MP_STANDARD_ERROR;

	if (timeout == 0.0)
		return bytes > 0;

	if (timeout < 0.0)
		block = TRUE;
	else
		/* XXX does not check for overflow */
		deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);

	Sleep(0);

	for (delay = 1 ; ; delay += 1) {
		if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
			return MP_STANDARD_ERROR;
		else if (bytes > 0)
			return TRUE;

		if (!block) {
			difference = deadline - GetTickCount();
			if (difference < 0)
				return FALSE;
			if ((int)delay > difference)
				delay = difference;
		}

		if (delay > 20)
			delay = 20;

		Sleep(delay);

		/* check for signals */
		Py_BLOCK_THREADS 
		res = PyErr_CheckSignals();
		Py_UNBLOCK_THREADS

		if (res)
			return MP_EXCEPTION_HAS_BEEN_SET;
	}
}

/*
 * "connection.h" defines the PipeConnection type using the definitions above
 */

#define CONNECTION_NAME "PipeConnection"
#define CONNECTION_TYPE PipeConnectionType

#include "connection.h"