Sunday 26 July 2009

IOCP-based sockets with ctypes in Python: 5

Previous post: IOCP-based sockets with ctypes in Python: 4

The goal this time is to write an echo server, which will drive the addition of handling receiving data from connected sockets and a more general polling loop.

In order to keep track of the current accept socket and any other connected sockets, I need a dictionary to store the relevant state. The intention is that the key will be the completion key and the value will consist of an arbitrary number of elements, the first two of which are the current state ID and then the socket.

stateByKey = {}
I will only be looking up values for unknown completion keys in the dictionary, for two states, reading and writing.
STATE_WRITING = 1
STATE_READING = 2
I can identify incoming connections being assigned to the accept socket by the LISTEN_COMPLETION_KEY. I create an initial accept socket ready for use when the polling starts.
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()
After which the main loop of the script can pump the polling for completed packets.
try:
while Pump():
pass
print "*** Unexpected exit."
except KeyboardInterrupt:
print "*** Keyboard interrupt."
Cleanup()
The Pump function needs to be able to start an overlapped WSARecv operation on a newly connected socket. When it receives the completed packet for that operation, it needs to start an overlapped WSASend operation to send back what it received on the same socket. It also needs to start another overlapped WSARecv operation when the wrote completes so it can repeat the process.

With regard to the overlapped WSASend operation, I've already defined and used that function in a previous post, so I can fetch and repurpose the relevant logic.

The idea is that the function gets told the socket to send to, what message to send and returns anything that needs to be stored in the state dictionary. The expected return values of the state ID and socket are there, but the reference to the overlapped object is also returned and stored so that it does not get garbage collected as mentioned in the previous post.
def StartOverlappedWrite(_socket, msg):
sendBuffer = (WSABUF * 1)()
sendBuffer[0].buf = msg
sendBuffer[0].len = len(msg)

bytesSent = DWORD()
ovSend = OVERLAPPED()

ret = WSASend(_socket, cast(sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_WRITING, _socket, ovSend
The general use of this function is in the form:
stateByKey[completionKeyValue] = StartOverlappedRead(stateData[1])
In order to call WSARecv, I first need to define it with ctypes.
int WSARecv(
__in SOCKET s,
__inout LPWSABUF lpBuffers,
__in DWORD dwBufferCount,
__out LPDWORD lpNumberOfBytesRecvd,
__inout LPDWORD lpFlags,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
With a straightword ctypes definition.
WSARecv = windll.Ws2_32.WSARecv
WSARecv.argtypes = (SOCKET, POINTER(WSABUF), DWORD, POINTER(DWORD), POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSARecv.restype = c_int
Using it is almost identical to the use of WSASend. The only difference is that I need a buffer for incoming data to be placed into, and I need to return that buffer so that I can see what arrived when the completed packet arrives.
def StartOverlappedRead(_socket):
recvBuffer = (WSABUF * 1)()
recvBuffer[0].buf = ' ' * 4096
recvBuffer[0].len = 4096
ovRecv = OVERLAPPED()

numberOfBytesRecvd = DWORD()
flags = DWORD()
ret = WSARecv(_socket, cast(recvBuffer, POINTER(WSABUF)), 1, byref(numberOfBytesRecvd), byref(flags), byref(ovRecv), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_READING, _socket, recvBuffer, ovRecv
Now this can all be linked into the polling for completed packets.

The special case outside of the echo handling of connected sockets, is the initial handling of newly connected sockets. Here I start an overlapped read operation on the newly connected socket and create a new accept socket for the next incoming connection.

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)
When a write operation completes a new read operation is started for the socket.
if stateByKey[completionKey.value][0] == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateByKey[completionKey.value][1])
And when a read operation completes, I see how many bytes were read, and write that many bytes from the stored read buffer out to the relevant socket. If 0 bytes were read, this signals that the socket has been disconnected, and it can be forgotten about.

numberOfBytes is an output from the call to GetQueuedCompletionStatus which would be above this code.
if stateByKey[completionKey.value][0] == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateByKey[completionKey.value][1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
del stateByKey[completionKey.value]
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ recvBuffer[0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(_socket, msg)
Putting this all together and massaging it a little, gives the following Pump function:
def Pump():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while True:
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 500)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue
Cleanup()
raise WinError(err)
break

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)

print "CONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
else:
stateData = stateByKey[completionKey.value]
del stateByKey[completionKey.value]

state = stateData[0]
if state == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateData[1])
elif state == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateData[1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ stateData[2][0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(stateData[1], msg)
else:
Cleanup()
raise Exception("Unexpected completion key", completionKey, "state", state)

return True
And at this point the script implements a functional echo server, handling disconnected sockets as intended.

There are a few things that could be polished, for instance there's no need to allocate new buffers or overlapped objects for each socket for every operation, these could easily be reused. But that's not that so important.

The tentative long term goal is to build up a base of more or less reasonable code that covers the required range of IO completion port based networking support, in order to potentially write a new stacklesssocket.py module.

Next post: IOCP-based sockets with ctypes in Python: 6
Script source code: 04 - Echoing.py

No comments:

Post a Comment