Friday 24 July 2009

IOCP based sockets with ctypes in Python: 3

Previous post: IOCP-based sockets with ctypes in Python: 2

The listen socket needs to be associated with an IO completion port. First I need to create one using CreateIoCompletionPort.

HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in_opt HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);
Defined using ctypes in this way.
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, c_ulong, DWORD)
CreateIoCompletionPort.restype = HANDLE
Now I can create the IO completion port.
NULL = c_ulong()
INVALID_HANDLE_VALUE = HANDLE(-1)
NULL_HANDLE = HANDLE(0)

hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL_HANDLE, NULL, NULL)
if hIOCP == 0:
err = WSAGetLastError()
closesocket(listenSocket)
WSACleanup()
raise WinError(err)
The listen socket I created needs to be associated with that IO completion port.
CreateIoCompletionPort(listenSocket, hIOCP, NULL, NULL)
The created IO completion port needs to be closed at some point (when all the sockets associated with it have been closed) with CloseHandle.
BOOL WINAPI CloseHandle(
__in HANDLE hObject
);
Defined with ctypes.
CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL
Now I can add the AcceptEx definitions I previously wrote.
class _US(Structure):
_fields_ = [
("Offset", DWORD),
("OffsetHigh", DWORD),
]

class _U(Union):
_fields_ = [
("s", _US),
("Pointer", c_void_p),
]
_anonymous_ = ("s",)

class OVERLAPPED(Structure):
_fields_ = [
("Internal", POINTER(ULONG)),
("InternalHigh", POINTER(ULONG)),
("u", _U),
("hEvent", HANDLE),
# Custom fields.
("channel", py_object),
]
_anonymous_ = ("u",)

AcceptEx = windll.Mswsock.AcceptEx
AcceptEx.argtypes = (SOCKET, SOCKET, c_void_p, DWORD, DWORD, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
AcceptEx.restype = BOOL
In order to call AcceptEx, a socket needs to be precreated for the incoming connection to be assigned to.
ret = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
err = WSAGetLastError()
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)

acceptSocket = ret
And then the AcceptEx call can be made.
FALSE = 0
ERROR_IO_PENDING = 997

dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in)/2
dwRemoteAddressLength = sizeof(sockaddr_in)/2
outputBuffer = create_string_buffer(2 * sizeof(sockaddr_in))
dwBytesReceived = DWORD()
ovAccept = OVERLAPPED()

ret = AcceptEx(listenSocket, acceptSocket, outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(dwBytesReceived), byref(ovAccept))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
The accept socket also needs to be associated with the IO completion port.
CreateIoCompletionPort(acceptSocket, hIOCP, NULL, NULL)
Now incoming connections should be queued on the IO completion port for me to poll using GetQueuedCompletionStatus.
BOOL WINAPI GetQueuedCompletionStatus(
__in HANDLE CompletionPort,
__out LPDWORD lpNumberOfBytes,
__out PULONG_PTR lpCompletionKey,
__out LPOVERLAPPED *lpOverlapped,
__in DWORD dwMilliseconds
);
Which specified with ctypes is..
GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong), POINTER(POINTER(OVERLAPPED)), DWORD)
GetQueuedCompletionStatus.restype = BOOL
For the purposes of writing a test script, I'm going to wait for 10 seconds for an incoming connection.
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 10000)
if ret == FALSE:
err = WSAGetLastError()
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
Now to run the script. I opened the listening socket on localhost port 10101, so telnet localhost 10101 will make a connection. My first attempt is going to let the GetQueuedCompletionStatus time out.
Traceback (most recent call last):
File "02 - Accepting.py", line 264, in
WSACleanup()
WindowsError: [Error 258] The wait operation timed out.
That is to be expected. Now to rerun the script, but do the telnet before the GetQueuedCompletionStatus call times out.
Traceback (most recent call last):
File "02 - Accepting.py", line 265, in
raise WinError(err)
WindowsError: [Error 122] The data area passed to a system call is too small.
Oops. Rereading the AcceptEx documentation, it suggests that the space needed for an address should be "at least 16 bytes more than the maximum address length for the transport protocol in use". Ah, my address length calculation was just wrong anyway, regardless of the lack of extra 16 bytes. The following lines are changed:
dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in) + 16
dwRemoteAddressLength = sizeof(sockaddr_in) + 16
outputBuffer = create_string_buffer(dwReceiveDataLength + dwLocalAddressLength + dwRemoteAddressLength)
I also need to be able to at least naively determine whether the overlapped object I get is one I created. The only one I created was ovAccept, so I misuse my custom channel attribute to store a string.
ovAccept = OVERLAPPED()
ovAccept.channel = "accept"
And add some extra output to the script on success.
print "SUCCESS"
print ovCompletedPtr.contents.channel
Now to paste the script into a python console and see what the output is.
>>> print "SUCCESS"
SUCCESS
>>> print ovCompletedPtr.contents.channel
accept
Now I'll send a message to the accepted connection before disconnecting them. WSASend looks like the right function to call.
int WSASend(
__in SOCKET s,
__in LPWSABUF lpBuffers,
__in DWORD dwBufferCount,
__out LPDWORD lpNumberOfBytesSent,
__in DWORD dwFlags,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
And the new structure WSABUF.
struct WSABUF {
u_long len;
char FAR *buf;
}
Which gives the following ctypes definitions.
class WSABUF(Structure):
_fields_ = [
("len", c_ulong),
("buf", c_char_p),
]

WSASend = windll.Ws2_32.WSASend
WSASend.argtypes = (SOCKET, POINTER(WSABUF), DWORD, POINTER(DWORD), DWORD, POINTER(OVERLAPPED), c_void_p)
WSASend.restype = c_int
And now to send the message on the accepted connection.
msg = "Disconnecting you.."
sendBuffer = (WSABUF * 1)()
sendBuffer[0].buf = msg
sendBuffer[0].len = len(msg)
bytesSent = DWORD()
ovSend = OVERLAPPED()
ovSend.channel = "send"
ret = WSASend(acceptSocket, cast(sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
I'll just repeat the earlier poll for the sake of simplicity.
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 10000)
if ret == FALSE:
err = WSAGetLastError()
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
The received overlapped object should be the one for the send. Print some output to confirm I got the right one.
if ovCompletedPtr.contents.channel == "send":
print "SUCCESS"
else:
print "FAILURE: Unrecognised overlapped object"
At this point the message should have been sent to the accepted connection so I'll now disconnect them and exit.
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
Pasting the script into an open Python console gives the following output:
SUCCESS
And the console where I telneted sees the following output:
Disconnecting you..

Connection to host lost.
That's the initial goal achieved. The final goal is to have low-level IOCP networking code which can be wrapped using Stackless Python with a socket module interface so that it can be used in place of stacklesssocket.py.

Next post: IOCP-based sockets with ctypes in Python: 4
Script source code: 02 - Accepting.py

Edited at 6:25PM on 2009-07-25:

Was:
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
CreateIoCompletionPort.restype = HANDLE
Changed to:
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, c_ulong, DWORD)
CreateIoCompletionPort.restype = HANDLE
The pointer while correct in matching the C++ definition had confused me and was leading me to pass in a pointer to a value rather than the value itself, in later code based on this.

No comments: