Showing posts with label iocp. Show all posts
Showing posts with label iocp. Show all posts

Friday, 31 July 2009

IOCP-based sockets with ctypes in Python: 7

Previous post: IOCP-based sockets with ctypes in Python: 6

In the last attempt, I managed to get a minimal amount of standard socket code making use of the functionality Stackless Python provides, working within my script. But if I am to make a proper replacement socket module, I need to be able to use it in the same way stacklesssocket.py is used. To this end, the use case I want to support is Andrew Dalke's "more naturally usable" example script.

As can be seen in the script, the following code was the way the replacement socket module was put in place:

import stacklesssocket
sys.modules["socket"] = stacklesssocket
Back in Python 2.5 when it was being developed, the socket module was a lot simpler and the approach of making a complete replacement module was feasible. These days however, in later versions of Python 2 and also in Python 3, the socket module has become somewhat more complicated and the approach of leaving the standard version in place and substituting parts became practical. This lead to new code to substitute in that functionality:
import stacklesssocket
stacklesssocket.install()
The goal

Andrew's example script, the supporting of which is my goal, contains the following code:
import sys
import stacklesssocket
import stackless

stacklesssocket.install()

import urllib
import time

def download(uri):
t1 = time.time()
f = urllib.urlopen(uri)
s = f.read()
t2 = time.time()
print "Downloaded", uri, "in", "%.1f" % (t2-t1), "seconds"
return t2-t1


print " === Serial === "
t1 = time.time()
download("http://www.stackless.com/wiki/Tasklets")
download("http://www.stackless.com/wiki/Channels")
t2 = time.time()
print " --->", t2-t1

print " === Parallel === "
t1 = time.time()
stackless.tasklet(download)("http://www.stackless.com/wiki/Tasklets")
stackless.tasklet(download)("http://www.stackless.com/wiki/Channels")
stackless.run()
t2 = time.time()
print " --->", t2-t1
The most important aspect of this, is how it uses another higher level standard library module (urllib) that in turn uses the standard socket module. By using my replacement functionality in this way, I can verify how compatible and stable it might be.

The implementation

The original stacklesssocket.py module was based on asyncore, another framework that wraps select. Back when I wrote it, I was glad to have it there to use, but the fact is that adopting a framework brings in dependencies and abitrary abstraction. If I had the time and interest, I would rewrite stacklesssocket.py to use select directly.

At this time, I am developing for Stackless Python 2.6.2. Right off the bat, it was obvious that this new module was going to be somewhat simpler. There were several substitutions to the standard socket module for stacklesssocket.py, but in this case it looks like I just need to subsitute my own _realsocket object.
def install():
stdsocket._realsocket = socket
The first obstacle was my lack of a connect method on my replacement socket object. Checking out the available functions on MSDN, ConnectEx was the one which suited my overlapped IO needs. However, I was unable to find it exported from any Winsock related DLL. It turns out, you need to call WSAIoctl in order to get a pointer to it. I had been avoiding calling WSAIoctl as I assumed it was only used to make faster function calls, but as it is the gateway to functionality I need that is no longer something I can do.
int WSAIoctl(
__in SOCKET s,
__in DWORD dwIoControlCode,
__in LPVOID lpvInBuffer,
__in DWORD cbInBuffer,
__out LPVOID lpvOutBuffer,
__in DWORD cbOutBuffer,
__out LPDWORD lpcbBytesReturned,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
The corresponding ctypes definition is as follows:
WSAIoctl = windll.Ws2_32.WSAIoctl
WSAIoctl.argtypes = (SOCKET, DWORD, c_void_p, DWORD, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSAIoctl.restype = c_int
The appropriate dwIoControlCode value to get a function pointer is SIO_GET_EXTENSION_FUNCTION_POINTER. The value of lpvInBuffer to indicate that the pointer to ConnectEx is the one I want, is the following GUID.
{ 0x25a207b9,0xddf3,0x4660, { 0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e }}
I'm not doing this as an overlapped operation, as I can't see how it could end up blocking in any meaningful way. This resulted in the following code in the replacement sockets __init__ method:
        self.wsFnConnectEx = ConnectExFunc()
dwBytes = DWORD()
ret = WSAIoctl(_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, byref(WSAID_CONNECTEX), sizeof(WSAID_CONNECTEX), byref(self.wsFnConnectEx), sizeof(self.wsFnConnectEx), byref(dwBytes), cast(0, POINTER(OVERLAPPED)), 0)
if ret == SOCKET_ERROR:
err = WSAGetLastError()
closesocket(ret)
raise WinError(err)
With this out of the way, I can now move onto fleshing out the connect method.
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
c = ovConnect.channel = stackless.channel()

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
raise WinError()

activeIO[self._socket] = c
c.receive()
ConnectEx was not working on the first attempt, it turns out you need to bind a socket to a local address before you can connect using it. Oh well, time to retry Andrew's script and see how I go.

This time, the lack of a sendall method was preventing the script from running. sendall just continues calling send for the same set of data, until all of it has been sent.
    def sendall(self, msg, flags=None):
bytesSent = self.send(msg)
while bytesSent < len(msg):
bytesSent += self.send(msg[bytesSent:])
At this point I decided to take note of the fact that it is possible for operations that are performed with an overlapped object, may actually complete right away. However, my mistake was in assuming that if this happened, a completion packet wouldn't arrive through GetQueuedCompletionStatus. This led to crashes where I examined the channel attribute on arrived packets, where the overlapped object had already been garbage collected due to the exit of the function that started the operation.

In order to clean things up, I decided to store a reference to the overlapped object ina global variable and in addition move the channel reference I hold also outside and alongside of that overlapped object. I could have stored the socket handle in the overlapped object, but since CreateIOCompletionPort allows me to associate a completion key with the socket, I store it there instead.
        # Bind the socket to the shared IO completion port.
CreateIoCompletionPort(_socket, hIOCP, _socket, NULL)
The idea was that I could then ignore packets that arrived for earlier operations which had actually completed immediately. Of course, logic having gotten a result for an IO operation may try another which might this time block. This means that unwanted packets for the operations that had completed immediately would come in and assume they were for the followup blocking operation. Errors ensued. So each completed packet needs to be able to be matched up with the operation state stored for it.

This left connect in the following state:
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
opID = ovConnect.opID = self.__getnextopid()
c = stackless.channel()
c.preference = 0
ovConnect.label = "connect"

activeOps[(self._socket, opID)] = (c, ovConnect)

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
raise WinError()

c.receive()
The other overlapped IO operations also needed to be changed to match, along with _DispatchIOCP.
def _DispatchIOCP():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

def _GetCompletionChannel(completionKey, overlappedPtr):
_socket = completionKey.value
opID = ovCompletedPtr.contents.opID
k = _socket, opID
c, ovRef = activeOps[k]
del activeOps[k]

return c

while stackless.runcount > 2:
while True:
stackless.schedule()

c = None
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 50)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue

if not bool(ovCompletedPtr):
raise WinError(err)

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
c.send_exception(WinError, err)
continue

break

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
if c.balance == -1:
c.send(numberOfBytes.value)
One note is that accessing ovCompletedPtr.contents will error if the value is NULL, the safe way with ctypes to ascertain whether the value is NULL is to check if the boolean value is False.

Running Andrew's script at this point results in it exiting cleanly, but mid-execution. The reason for this is the condition stackless.runcount > 2. The theory is that this should keep the loop running as long as there are at least two tasklets in the scheduler, this is just plain wrong and ill thought out. The fact is that at any given time, at least for Andrew's script, the runcount is only likely to be one. The reason for this is that all the other tasklets doing IO will be outside of the scheduler, blocked on a channel. So the lowest possible valid value for runcount that will require continued looping will be one.

Taking this into account, I added a weakref dictionary to track the socket objects which were live and added it as an additional reason to loop. This in no way addresses the above problem, but it does the trick. Andrew's script now runs, and gives the following output:
 === Serial ===
Downloaded http://www.stackless.com/wiki/Tasklets in 3.4 seconds
Downloaded http://www.stackless.com/wiki/Channels in 3.9 seconds
---> 7.26900005341
=== Parallel ===
Downloaded http://www.stackless.com/wiki/Channels in 3.3 seconds
Downloaded http://www.stackless.com/wiki/Tasklets in 3.5 seconds
---> 3.51300001144
Success (of a sort)!

Of course the module in no way provides the full functionality of the standard version yet. Trivial aspects no-one in their right mind should be using, like UDP and so forth, are completely unsupported.

Script source code: 06 - Polished Socket.py

Tuesday, 28 July 2009

IOCP-based sockets with ctypes in Python: 6

Previous post: IOCP-based sockets with ctypes in Python: 5

The goal of this step is to implement a partial amount of a Stackless-compatible socket module that provides the basic functionality the standard one has. It should be usable to provide blocking socket IO within Stackless tasklets.

The most straightforward approach is to base the code on how it was done with stacklesssocket.py. In stacklesssocket.py, a tasklet is launched to poll asyncore.

managerRunning = False

def ManageSockets():
global managerRunning

while len(asyncore.socket_map):
# Check the sockets for activity.
asyncore.poll(0.05)
# Yield to give other tasklets a chance to be scheduled.
stackless.schedule()

managerRunning = False
There I was looping and polling asyncore. I can easily substitute in the polling I already have built up with GetQueuedCompletionStatus.
managerRunning = False

def ManageSockets():
global managerRunning

wsaData = WSADATA()
ret = WSAStartup(MAKEWORD(2, 2), LP_WSADATA(wsaData))
if ret != 0:
raise WinError(ret)

hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL_HANDLE, NULL, NULL)
if hIOCP == 0:
WSACleanup()
raise WinError()

numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while True:
while True:
# Yield to give other tasklets a chance to be scheduled.
stackless.schedule()

ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 50)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue

ovCompletedPtr.contents.channel.send_exception(WinError, err)
continue

break

# Handle the completed packet.
ovCompletedPtr.contents.channel.send(numberOfBytes.value)

managerRunning = False
There are several things to note here.

As in the asyncore polling, I regularly call stackless.schedule() in order to yield to the scheduler and allow any other tasklets that might be present within it to get a chance to execute.

The custom attribute channel in the OVERLAPPED structure is expected to have had a stackless.channel instance assigned to it. In the case of an error, I can wake up the tasklet that initiated the asynchronous IO and raise an exception on it. In the case of success, I can return important information to the logic it invoked to trigger that IO, so that it can more easily handle the success case. The only piece of information I can see being of use is numberOfBytes, as that saves me from having to query it manually using WSAGetOverlappedResult.

Now I need to make a socket class that is compatible with the standard Python one.
class socket:
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=IPPROTO_TCP):
ret = WSASocket(family, type, proto, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
raise WinError()

# Bind the socket to the shared IO completion port.
CreateIoCompletionPort(ret, hIOCP, NULL, NULL)

self._socket = ret
That gives me a fully set up socket object, on instantiation. Refactoring send, recv, accept and bind methods onto it from the code I have already written, should be similarly as straightforward.

Starting with send:
    def send(self, data):
self.sendBuffer[0].buf = data
self.sendBuffer[0].len = len(data)

bytesSent = DWORD()
ovSend = OVERLAPPED()
c = ovSend.channel = stackless.channel()

ret = WSASend(self._socket, cast(self.sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

# Return the number of bytes that were send.
return c.receive()
This was a pretty direct conversion. The only additions were the setting of the channel attribute, blocking on the channel until we get returned the number of bytes sent or an except raised through it and returning the number of bytes sent to the caller.

Next is the recv method:
    def recv(self, byteCount, flags=0):
if self.recvBuffer is None:
self.recvBuffer = (WSABUF * 1)()
self.recvBuffer[0].buf = ' ' * READ_BUFFER_SIZE
self.recvBuffer[0].len = READ_BUFFER_SIZE

# WARNING: For now, we cap the readable amount to size of the preallocated buffer.
byteCount = min(byteCount, READ_BUFFER_SIZE)

numberOfBytesRecvd = DWORD()
flags = DWORD()
ovRecv = OVERLAPPED()
c = ovRecv.channel = stackless.channel()

ret = WSARecv(self._socket, cast(self.recvBuffer, POINTER(WSABUF)), 1, byref(numberOfBytesRecvd), byref(flags), byref(ovRecv), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
raise WinError(err)

# Block until the overlapped operation completes.
numberOfBytes = c.receive()
return self.recvBuffer[0].buf[:numberOfBytes]
This was also straightforward. Again, a channel was provided and blocked upon. A received data buffer is created on the first call and reused on subsequent calls, and the amount of data that is allowed to be received is capped to the size of that buffer. On success and a returned number of bytes received, the appropriate segment of the data buffer is sliced and returned to the caller.

In the event that the remote connection disconnected, the channel would have indicated that 0 bytes were received and this would have resulted in an empty string being returned. This is fine, as it is the right way to indicate a socket has disconnected on a recv call.

bind is extremely straightforward, so that came next:
    def bind(self, address):
host, port = address

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

ret = bind(self._socket, sockaddr_inp(sa), sizeof(sa))
if ret == SOCKET_ERROR:
raise WinError()
As is listen:
    def listen(self, backlog):
ret = listen(self._socket, backlog)
if ret != 0:
raise WinError()
A little more complex is accept:
    def accept(self):
dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in) + 16
dwRemoteAddressLength = sizeof(sockaddr_in) + 16
outputBuffer = create_string_buffer(dwReceiveDataLength + dwLocalAddressLength + dwRemoteAddressLength)

dwBytesReceived = DWORD()
ovAccept = OVERLAPPED()
c = ovAccept.channel = stackless.channel()

acceptSocket = socket()

ret = AcceptEx(self._socket, acceptSocket._socket, outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(dwBytesReceived), byref(ovAccept))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket._socket)
raise WinError(err)

# Block until the overlapped operation completes.
c.receive()

localSockaddr = sockaddr_in()
localSockaddrSize = c_int(sizeof(sockaddr_in))
remoteSockaddr = sockaddr_in()
remoteSockaddrSize = c_int(sizeof(sockaddr_in))

GetAcceptExSockaddrs(outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(localSockaddr), byref(localSockaddrSize), byref(remoteSockaddr), byref(remoteSockaddrSize))

hostbuf = create_string_buffer(NI_MAXHOST)
servbuf = c_char_p()

port = ntohs(localSockaddr.sin_port)

localSockaddr.sin_family = AF_INET
ret = getnameinfo(localSockaddr, sizeof(sockaddr_in), hostbuf, sizeof(hostbuf), servbuf, 0, NI_NUMERICHOST)
if ret != 0:
err = WSAGetLastError()
closesocket(acceptSocket._socket)
raise WinError(err)

# host = inet_ntoa(localSockaddr.sin_addr)

return (acceptSocket, (hostbuf.value, port))
This was refactored in much the same way as the other asynchronous methods (send and recv), with a channel to block for the result upon. However it got a little more complex when it came to providing the expected address part of the return value. When I connect to a standard Python socket listening on my local machine, with a telnet connection from the same machine, I see the host as the standard localhost address (127.0.0.1). However, this did not give the same result:
host = inet_ntoa(localSockaddr.sin_addr)
Instead it would give the address 0.0.0.0. So, I looked at how the low-level Python C source code did it, and saw it was calling getnameinfo. My next attempt based on that, was the following code:
        localSockaddr.sin_family = AF_INET
ret = getnameinfo(localSockaddr, sizeof(sockaddr_in), hostbuf, sizeof(hostbuf), servbuf, 0, NI_NUMERICHOST)
if ret != 0:
err = WSAGetLastError()
closesocket(acceptSocket._socket)
raise WinError(err)
However, this gives the same host address. For now, you can see both in the accept logic. I have to assume that this is just some idiosyncracy that does not really matter.

One thing I did notice though, was that my in_addr structure was not working correctly. It turns out that I had defined the sockaddr_in structure incorrectly.

Instead of:
class _UN(Structure):
_fields_ = [
("s_un_b", _UN_b),
("s_un_w", _UN_w),
("s_addr", c_ulong),
]

class in_addr(Union):
_fields_ = [
("s_un", _UN),
]
_anonymous_ = ("s_un",)
I should have had:
class in_addr(Union):
_fields_ = [
("s_un_b", _UN_b),
("s_un_w", _UN_w),
("s_addr", c_ulong),
]
As a foreign function interface solution, ctypes is extremely impressive and allows you to do pretty much anything you want. Union structures however, seem to be strangely hard to get right. With this change, now the in_addr field of sockaddr_in is working correctly, although it did not fix my host address problem.

This completes the basic range of socket functionality. A good next step from here is to write a simple echo server, with standard Python blocking socket usage within Stackless Python.
def Run():
address = ("127.0.0.1", 3000)
listenSocket = socket(AF_INET, SOCK_STREAM)
listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
listenSocket.bind(address)
listenSocket.listen(5)

def handle_echo(_socket, _address):
while True:
data = currentSocket.recv(256)
if data == "":
print _address, "DISCONNECTED"
return

print _address, "READ", data, len(data)
dlen = currentSocket.send(data)
print _address, "ECHOD", dlen

while True:
print "Waiting for new connection"
currentSocket, clientAddress = listenSocket.accept()
print "Connection", currentSocket.fileno(), "from", clientAddress

stackless.tasklet(handle_echo)(currentSocket, clientAddress)

if __name__ == "__main__":
StartManager()

print "STARTED"
stackless.tasklet(Run)()
stackless.run()
print "EXITED"
This works perfectly, however there is one problem. When control-c is pressed, the Python interpreter will be killed due to an access violation or some such thing.

A reasonable assumption would be that Stackless tasklets blocked on IOCP related actions are not being cleaned up properly. In order to make sure the required clean up happens, I wrapped the main polling loop with suitable logic.
    try:
# Call the polling loop.
finally:
_CleanupActiveIO()
CloseHandle(hIOCP)
WSACleanup()
The definition of _CleanupActiveIO was as follows:
def _CleanupActiveIO():
for k, v in activeIO.items():
ret = CancelIo(k)
if ret == 0:
raise WinError()

# Any tasklets blocked on IO are killed silently.
v.send_exception(TaskletExit)
Raising a real exception on the blocked tasklets, causes the interpreter to exit on the given tasklet with that exception. I want them to be killed silently, having cleaned up properly and for the exception on the main tasklet to be the one that causes the interpreter to exit and be displayed.

In order for the cleanup method to know about the channels in use and tasklets blocked on them, any method that started asynchronous IO had the following line added before its channel receive call.
        activeIO[self._socket] = c
CancelIO is of course a Windows function.
BOOL WINAPI CancelIo(
__in HANDLE hFile
);
With the matching ctypes definition:
CancelIo = windll.kernel32.CancelIo
CancelIo.argtypes = (HANDLE,)
CancelIo.restype = BOOL
Next steps

At this point, I have a socket object that can be used with Stackless Python to do synchronous IO that is actually transparently doing asynchronous IO using IO completion ports behind the scenes. It still needs to be fleshed out into a module that can be monkey-patched in place of the standard socket module, as the stacklesssocket.py module can.

Next post: IOCP-based sockets with ctypes in Python: 7
Script source code: 05 - Socket.py

Sunday, 26 July 2009

IOCP-based sockets with ctypes in Python: 5

Previous post: IOCP-based sockets with ctypes in Python: 4

The goal this time is to write an echo server, which will drive the addition of handling receiving data from connected sockets and a more general polling loop.

In order to keep track of the current accept socket and any other connected sockets, I need a dictionary to store the relevant state. The intention is that the key will be the completion key and the value will consist of an arbitrary number of elements, the first two of which are the current state ID and then the socket.

stateByKey = {}
I will only be looking up values for unknown completion keys in the dictionary, for two states, reading and writing.
STATE_WRITING = 1
STATE_READING = 2
I can identify incoming connections being assigned to the accept socket by the LISTEN_COMPLETION_KEY. I create an initial accept socket ready for use when the polling starts.
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()
After which the main loop of the script can pump the polling for completed packets.
try:
while Pump():
pass
print "*** Unexpected exit."
except KeyboardInterrupt:
print "*** Keyboard interrupt."
Cleanup()
The Pump function needs to be able to start an overlapped WSARecv operation on a newly connected socket. When it receives the completed packet for that operation, it needs to start an overlapped WSASend operation to send back what it received on the same socket. It also needs to start another overlapped WSARecv operation when the wrote completes so it can repeat the process.

With regard to the overlapped WSASend operation, I've already defined and used that function in a previous post, so I can fetch and repurpose the relevant logic.

The idea is that the function gets told the socket to send to, what message to send and returns anything that needs to be stored in the state dictionary. The expected return values of the state ID and socket are there, but the reference to the overlapped object is also returned and stored so that it does not get garbage collected as mentioned in the previous post.
def StartOverlappedWrite(_socket, msg):
sendBuffer = (WSABUF * 1)()
sendBuffer[0].buf = msg
sendBuffer[0].len = len(msg)

bytesSent = DWORD()
ovSend = OVERLAPPED()

ret = WSASend(_socket, cast(sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_WRITING, _socket, ovSend
The general use of this function is in the form:
stateByKey[completionKeyValue] = StartOverlappedRead(stateData[1])
In order to call WSARecv, I first need to define it with ctypes.
int WSARecv(
__in SOCKET s,
__inout LPWSABUF lpBuffers,
__in DWORD dwBufferCount,
__out LPDWORD lpNumberOfBytesRecvd,
__inout LPDWORD lpFlags,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
With a straightword ctypes definition.
WSARecv = windll.Ws2_32.WSARecv
WSARecv.argtypes = (SOCKET, POINTER(WSABUF), DWORD, POINTER(DWORD), POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSARecv.restype = c_int
Using it is almost identical to the use of WSASend. The only difference is that I need a buffer for incoming data to be placed into, and I need to return that buffer so that I can see what arrived when the completed packet arrives.
def StartOverlappedRead(_socket):
recvBuffer = (WSABUF * 1)()
recvBuffer[0].buf = ' ' * 4096
recvBuffer[0].len = 4096
ovRecv = OVERLAPPED()

numberOfBytesRecvd = DWORD()
flags = DWORD()
ret = WSARecv(_socket, cast(recvBuffer, POINTER(WSABUF)), 1, byref(numberOfBytesRecvd), byref(flags), byref(ovRecv), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_READING, _socket, recvBuffer, ovRecv
Now this can all be linked into the polling for completed packets.

The special case outside of the echo handling of connected sockets, is the initial handling of newly connected sockets. Here I start an overlapped read operation on the newly connected socket and create a new accept socket for the next incoming connection.

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)
When a write operation completes a new read operation is started for the socket.
if stateByKey[completionKey.value][0] == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateByKey[completionKey.value][1])
And when a read operation completes, I see how many bytes were read, and write that many bytes from the stored read buffer out to the relevant socket. If 0 bytes were read, this signals that the socket has been disconnected, and it can be forgotten about.

numberOfBytes is an output from the call to GetQueuedCompletionStatus which would be above this code.
if stateByKey[completionKey.value][0] == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateByKey[completionKey.value][1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
del stateByKey[completionKey.value]
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ recvBuffer[0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(_socket, msg)
Putting this all together and massaging it a little, gives the following Pump function:
def Pump():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while True:
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 500)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue
Cleanup()
raise WinError(err)
break

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)

print "CONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
else:
stateData = stateByKey[completionKey.value]
del stateByKey[completionKey.value]

state = stateData[0]
if state == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateData[1])
elif state == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateData[1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ stateData[2][0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(stateData[1], msg)
else:
Cleanup()
raise Exception("Unexpected completion key", completionKey, "state", state)

return True
And at this point the script implements a functional echo server, handling disconnected sockets as intended.

There are a few things that could be polished, for instance there's no need to allocate new buffers or overlapped objects for each socket for every operation, these could easily be reused. But that's not that so important.

The tentative long term goal is to build up a base of more or less reasonable code that covers the required range of IO completion port based networking support, in order to potentially write a new stacklesssocket.py module.

Next post: IOCP-based sockets with ctypes in Python: 6
Script source code: 04 - Echoing.py

Saturday, 25 July 2009

IOCP based sockets with ctypes in Python: 4

Previous post: IOCP-based sockets with ctypes in Python: 3

The next thing I want to do with this, is to take what I have written so far and reshape it into a simple naive server. It will accept any connection made to it and then set aside that connection (and ignore it), ready to accept the next one.

I don't really need to care what IO completion packets get queued for me to receive on my IO completion port. But for the sake of extending my code base a little, I will set the completion key for each socket associated with the port anyway.

The original socket that listens for connections will get a unique value that has been set aside.

LISTEN_COMPLETION_KEY = 90L
CreateIoCompletionPort(listenSocket, hIOCP, LISTEN_COMPLETION_KEY, NULL)
And unique keys will be allocated on demand for the sockets of incoming connections which will get accepted.
currentCompletionKey = 100L

def CreateCompletionKey():
global currentCompletionKey
v = currentCompletionKey
currentCompletionKey += 1L
return v
Accepted connections that are set aside to be ignored are stored in a designated dictionary.
overlappedByKey = {}
Because I will need to accept an arbitrary number of incoming connections, rather than just the one the previous version of this script accepted, I've gathered the relevant parts of the script into a function with three additions.

The first addition is the allocation of a unique completion key for the new accept socket.
ovKey = CreateCompletionKey()
The second addition is setting this as the completion key for the socket when it is associated with the port.
CreateIoCompletionPort(_acceptSocket, hIOCP, ovKey, NULL)
The third addition is the returning of three values. The key and the socket are needed, for identifying what socket completion packets relate to and doing further operations on respectively. But as someone on Stack Overflow noted, I also need to hold onto a reference to the OVERLAPPED object used, otherwise Python may garbage collect it at some point, perhaps when the function exits.
return ovKey, _acceptSocket, _ovAccept
The completed function is as follows:
def CreateAcceptSocket():
ret = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
err = WSAGetLastError()
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)

_acceptSocket = ret
ovKey = CreateCompletionKey()

dwBytesReceived = DWORD()
_ovAccept = OVERLAPPED()

ret = AcceptEx(listenSocket, _acceptSocket, outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(dwBytesReceived), byref(_ovAccept))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(_acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)

# Bind the accept socket to the IO completion port.
CreateIoCompletionPort(_acceptSocket, hIOCP, ovKey, NULL)
return ovKey, _acceptSocket, _ovAccept
This function is incorporated into a another that waits for a connection to be assigned that new accept socket. Within it, an inner loop repeatedly polls for a completed packet rather than waiting indefinitely until one is received, for reasons that will be described later.
def Loop():
global acceptSocket
acceptKey, acceptSocket, ovAccept = CreateAcceptSocket()

numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while 1:
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 500)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue
Cleanup()
raise WinError(err)
break

if completionKey.value != LISTEN_COMPLETION_KEY:
Cleanup()
raise Exception("Unexpected completion key", completionKey, "expected", LISTEN_COMPLETION_KEY)

overlappedByKey[acceptKey] = acceptSocket
Now I can loop connection after connection as they come in. The only error that should propagate up to this logic is the keyboard interrupt exception that is generated when someone presses control-c, and it looks much better for it to be handled cleanly than displaying the exception.
try:
while 1:
Loop()
except KeyboardInterrupt:
Cleanup()
Problems encountered

Python receives and handles signals, like the pressing of control-c, on its main thread. Because my script is not multithreaded it is running within that main thread and if external functions (like GetQueuedCompletionStatus) are called, the thread gets blocked until they return.

Initially I called GetQueuedCompletionStatus with an infinite timeout.
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), INFINITE)
This meant that the Python main thread only had a chance to run when new connections were made, so pressing control-c in the console was not handled until that happened.

Because I want to be able to interrupt my script at any time, I changed the call to happen repeatedly with a short timeout, allowing signals were to be handled promptly.
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 500)
Another problem was my misinterpretation of how CreateIoCompletionPort is used. The completion key parameter is declared as a pointer.
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in_opt HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);
So I assumed that I needed to store the key I wanted to use and pass a pointer to it. The value passed is the completion key, not a pointer. I made this same mistake in the past when I wrote an overlapped file IO solution. In order to avoid confusing ctypes, I define it ignoring the pointer part.
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, c_ulong, DWORD)
Another misinterpretation I made was assuming that when I had an overlapped AcceptEx call in progress, the completion key I would receive when a connection was accepted would be the one for the relevant accept socket. It turns out that the key given for each accept packet completion is actually the one for the listen socket.

Next step

A good next step to achieve the goal of flesh out writing to sockets and handling remote disconnections is extending this into an echo server. So, that is what I will do. The function Loop really needs to be renamed to AcceptConnection along the way.

Next post: IOCP based sockets with ctypes in Python: 5
Script source code: 03 - Serving.py

Friday, 24 July 2009

IOCP based sockets with ctypes in Python: 3

Previous post: IOCP-based sockets with ctypes in Python: 2

The listen socket needs to be associated with an IO completion port. First I need to create one using CreateIoCompletionPort.

HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in_opt HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);
Defined using ctypes in this way.
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, c_ulong, DWORD)
CreateIoCompletionPort.restype = HANDLE
Now I can create the IO completion port.
NULL = c_ulong()
INVALID_HANDLE_VALUE = HANDLE(-1)
NULL_HANDLE = HANDLE(0)

hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL_HANDLE, NULL, NULL)
if hIOCP == 0:
err = WSAGetLastError()
closesocket(listenSocket)
WSACleanup()
raise WinError(err)
The listen socket I created needs to be associated with that IO completion port.
CreateIoCompletionPort(listenSocket, hIOCP, NULL, NULL)
The created IO completion port needs to be closed at some point (when all the sockets associated with it have been closed) with CloseHandle.
BOOL WINAPI CloseHandle(
__in HANDLE hObject
);
Defined with ctypes.
CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL
Now I can add the AcceptEx definitions I previously wrote.
class _US(Structure):
_fields_ = [
("Offset", DWORD),
("OffsetHigh", DWORD),
]

class _U(Union):
_fields_ = [
("s", _US),
("Pointer", c_void_p),
]
_anonymous_ = ("s",)

class OVERLAPPED(Structure):
_fields_ = [
("Internal", POINTER(ULONG)),
("InternalHigh", POINTER(ULONG)),
("u", _U),
("hEvent", HANDLE),
# Custom fields.
("channel", py_object),
]
_anonymous_ = ("u",)

AcceptEx = windll.Mswsock.AcceptEx
AcceptEx.argtypes = (SOCKET, SOCKET, c_void_p, DWORD, DWORD, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
AcceptEx.restype = BOOL
In order to call AcceptEx, a socket needs to be precreated for the incoming connection to be assigned to.
ret = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
err = WSAGetLastError()
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)

acceptSocket = ret
And then the AcceptEx call can be made.
FALSE = 0
ERROR_IO_PENDING = 997

dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in)/2
dwRemoteAddressLength = sizeof(sockaddr_in)/2
outputBuffer = create_string_buffer(2 * sizeof(sockaddr_in))
dwBytesReceived = DWORD()
ovAccept = OVERLAPPED()

ret = AcceptEx(listenSocket, acceptSocket, outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(dwBytesReceived), byref(ovAccept))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
The accept socket also needs to be associated with the IO completion port.
CreateIoCompletionPort(acceptSocket, hIOCP, NULL, NULL)
Now incoming connections should be queued on the IO completion port for me to poll using GetQueuedCompletionStatus.
BOOL WINAPI GetQueuedCompletionStatus(
__in HANDLE CompletionPort,
__out LPDWORD lpNumberOfBytes,
__out PULONG_PTR lpCompletionKey,
__out LPOVERLAPPED *lpOverlapped,
__in DWORD dwMilliseconds
);
Which specified with ctypes is..
GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong), POINTER(POINTER(OVERLAPPED)), DWORD)
GetQueuedCompletionStatus.restype = BOOL
For the purposes of writing a test script, I'm going to wait for 10 seconds for an incoming connection.
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 10000)
if ret == FALSE:
err = WSAGetLastError()
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
Now to run the script. I opened the listening socket on localhost port 10101, so telnet localhost 10101 will make a connection. My first attempt is going to let the GetQueuedCompletionStatus time out.
Traceback (most recent call last):
File "02 - Accepting.py", line 264, in
WSACleanup()
WindowsError: [Error 258] The wait operation timed out.
That is to be expected. Now to rerun the script, but do the telnet before the GetQueuedCompletionStatus call times out.
Traceback (most recent call last):
File "02 - Accepting.py", line 265, in
raise WinError(err)
WindowsError: [Error 122] The data area passed to a system call is too small.
Oops. Rereading the AcceptEx documentation, it suggests that the space needed for an address should be "at least 16 bytes more than the maximum address length for the transport protocol in use". Ah, my address length calculation was just wrong anyway, regardless of the lack of extra 16 bytes. The following lines are changed:
dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in) + 16
dwRemoteAddressLength = sizeof(sockaddr_in) + 16
outputBuffer = create_string_buffer(dwReceiveDataLength + dwLocalAddressLength + dwRemoteAddressLength)
I also need to be able to at least naively determine whether the overlapped object I get is one I created. The only one I created was ovAccept, so I misuse my custom channel attribute to store a string.
ovAccept = OVERLAPPED()
ovAccept.channel = "accept"
And add some extra output to the script on success.
print "SUCCESS"
print ovCompletedPtr.contents.channel
Now to paste the script into a python console and see what the output is.
>>> print "SUCCESS"
SUCCESS
>>> print ovCompletedPtr.contents.channel
accept
Now I'll send a message to the accepted connection before disconnecting them. WSASend looks like the right function to call.
int WSASend(
__in SOCKET s,
__in LPWSABUF lpBuffers,
__in DWORD dwBufferCount,
__out LPDWORD lpNumberOfBytesSent,
__in DWORD dwFlags,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
And the new structure WSABUF.
struct WSABUF {
u_long len;
char FAR *buf;
}
Which gives the following ctypes definitions.
class WSABUF(Structure):
_fields_ = [
("len", c_ulong),
("buf", c_char_p),
]

WSASend = windll.Ws2_32.WSASend
WSASend.argtypes = (SOCKET, POINTER(WSABUF), DWORD, POINTER(DWORD), DWORD, POINTER(OVERLAPPED), c_void_p)
WSASend.restype = c_int
And now to send the message on the accepted connection.
msg = "Disconnecting you.."
sendBuffer = (WSABUF * 1)()
sendBuffer[0].buf = msg
sendBuffer[0].len = len(msg)
bytesSent = DWORD()
ovSend = OVERLAPPED()
ovSend.channel = "send"
ret = WSASend(acceptSocket, cast(sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
I'll just repeat the earlier poll for the sake of simplicity.
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 10000)
if ret == FALSE:
err = WSAGetLastError()
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
raise WinError(err)
The received overlapped object should be the one for the send. Print some output to confirm I got the right one.
if ovCompletedPtr.contents.channel == "send":
print "SUCCESS"
else:
print "FAILURE: Unrecognised overlapped object"
At this point the message should have been sent to the accepted connection so I'll now disconnect them and exit.
closesocket(acceptSocket)
closesocket(listenSocket)
CloseHandle(hIOCP)
WSACleanup()
Pasting the script into an open Python console gives the following output:
SUCCESS
And the console where I telneted sees the following output:
Disconnecting you..

Connection to host lost.
That's the initial goal achieved. The final goal is to have low-level IOCP networking code which can be wrapped using Stackless Python with a socket module interface so that it can be used in place of stacklesssocket.py.

Next post: IOCP-based sockets with ctypes in Python: 4
Script source code: 02 - Accepting.py

Edited at 6:25PM on 2009-07-25:

Was:
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
CreateIoCompletionPort.restype = HANDLE
Changed to:
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, c_ulong, DWORD)
CreateIoCompletionPort.restype = HANDLE
The pointer while correct in matching the C++ definition had confused me and was leading me to pass in a pointer to a value rather than the value itself, in later code based on this.

Wednesday, 22 July 2009

IOCP based sockets with ctypes in Python: 2

Previous post: IOCP-based sockets with ctypes in Python: 1

My current goal is to get basic asynchronous socket IO working using Winsock via ctypes. So this post documents the steps I have made today, in defining Windows functions in an interactive Python console and using them as I go so that I have a working tested result.

I have a standard set of import statements I have used to get everything I need directly available from ctypes.

from ctypes import windll, pythonapi
from ctypes import c_bool, c_char, c_ubyte, c_int, c_uint, c_short, c_ushort, c_long, c_ulong, c_void_p, byref, c_char_p, Structure, Union, py_object, POINTER, pointer
from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR, LPCWSTR, WinError, WORD
Before Winsock can be used, a call to WSAStartup needs to be made to initiate it for the current process.
int WSAStartup(
__in WORD wVersionRequested,
__out LPWSADATA lpWSAData
);
And before this function can be defined, the WSADATA structure it references needs to have already been defined. And the definition of WSADATA requires the values of the constants WSADESCRIPTION_LEN and WSASYS_STATUS_LEN which need to be searched for in Windows header files.
WSADESCRIPTION_LEN = 256
WSASYS_STATUS_LEN = 128

class WSADATA(Structure):
_fields_ = [
("wVersion", WORD),
("wHighVersion", WORD),
("szDescription", c_char * (WSADESCRIPTION_LEN+1)),
("szSystemStatus", c_char * (WSASYS_STATUS_LEN+1)),
("iMaxSockets", c_ushort),
("iMaxUdpDg", c_ushort),
("lpVendorInfo", c_char_p),
]
At this point, the prerequisites are present so that the WSAStartup function can also be defined.
WSAStartup = windll.Ws2_32.WSAStartup
WSAStartup.argtypes = (WORD, POINTER(WSADATA))
WSAStartup.restype = c_int
With WSAStartup defined, it can now be called.
def MAKEWORD(bLow, bHigh):
return (bHigh << 8) + bLow

wsaData = WSADATA()
LP_WSADATA = POINTER(WSADATA)
ret = WSAStartup(MAKEWORD(2, 2), LP_WSADATA(wsaData))
if ret != 0:
raise WinError(ret)
This works in the console and I can move onto creating sockets.

The first step is to define the WSASocket function.
SOCKET WSASocket(
__in int af,
__in int type,
__in int protocol,
__in LPWSAPROTOCOL_INFO lpProtocolInfo,
__in GROUP g,
__in DWORD dwFlags
);
This is as straightforward as it gets, once the datatype of GROUP is tracked down in the Windows header files. WSAPROTOCOL can be dismissed with a NULL value. Defining socket would be easier but it doesn't allow me to pass in a flag indicating the created socket should work in an overlapped manner.
GROUP = c_uint
SOCKET = c_uint

WSASocket = windll.Ws2_32.WSASocketA
WSASocket.argtypes = (c_int, c_int, c_int, c_void_p, GROUP, DWORD)
WSASocket.restype = SOCKET
And after locating the constants needed for the arguments from the Windows header files, I can now call the function.
AF_INET = 2
SOCK_STREAM = 1
IPPROTO_TCP = 6
WSA_FLAG_OVERLAPPED = 0x01
INVALID_SOCKET = ~0

ret = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
raise WinError()
I'll be reusing 'ret' so I'll store it in another variable.
listenSocket = ret
The next step will be to listen on a socket and accept incoming connections on it, then to read data from other sockets whose connections are accepted. So now I need to call listen on the socket I have just created. So to define it..
int listen(
__in SOCKET s,
__in int backlog
);
listen = windll.Ws2_32.listen
listen.argtypes = (SOCKET, c_int)
listen.restype = BOOL
And to call it..
SOMAXCONN = 0x7fffffff

ret = listen(listenSocket, SOMAXCONN)
if ret != 0:
raise WinError()
Oops.
WindowsError: [Error 10022] An invalid argument was supplied.
listenSocket should be a valid socket, so back to the documentation.
A descriptor identifying a bound, unconnected socket.
Oh, that's right, I need to bind it first.
int bind(
__in SOCKET s,
__in const struct sockaddr *name,
__in int namelen
);
I don't have a definition for sockaddr, so that needs to be made first.
struct in_addr {
union {
struct {
u_char s_b1,s_b2,s_b3,s_b4;
} S_un_b;
struct {
u_short s_w1,s_w2;
} S_un_w;
u_long S_addr;
} S_un;
}

struct sockaddr_in {
short sin_family;
u_short sin_port;
struct in_addr sin_addr;
char sin_zero[8];
};
And the corresponding Python definition.
class _UN_b(Structure):
_fields_ = [
("s_b1", c_ubyte),
("s_b2", c_ubyte),
("s_b3", c_ubyte),
("s_b4", c_ubyte),
]

class _UN_w(Structure):
_fields_ = [
("s_w1", c_ushort),
("s_w2", c_ushort),
]

class _UN(Structure):
_fields_ = [
("S_un_b", _UN_b),
("S_un_w", _UN_w),
("S_addr", c_ulong),
]

class in_addr(Union):
_fields_ = [
("S_un", _UN),
]
_anonymous_ = ("S_un",)

class sockaddr_in(Structure):
_fields_ = [
("sin_family", c_short),
("sin_port", c_ushort),
("sin_addr", in_addr),
("szDescription", c_char * 8),
]

sockaddr_inp = POINTER(sockaddr_in)
The definition seems to work, now to define bind and try it out.
bind = windll.Ws2_32.bind
bind.argtypes = (SOCKET, POINTER(sockaddr_in), c_int)
bind.restype = c_int
However, the values for sin_addr and sin_port need to be calculated and adjusted to suit the relevant data structures in sockaddr_in.
u_short WSAAPI htons(
__in u_short hostshort
);

struct hostent* FAR gethostbyname(
__in const char *name
);

struct hostent {
char FAR * h_name;
char FAR FAR **h_aliases;
short h_addrtype;
short h_length;
char FAR FAR **h_addr_list;
}
Which require the following definitions.
class hostent(Structure):
_fields_ = [
("h_name", c_charp),
("h_aliases", POINTER(c_charp)),
("h_addrtype", c_short),
("h_length", c_short),
("h_addr_list", POINTER(c_charp)),
]

hostentp = POINTER(hostent)

gethostbyname = windll.Ws2_32.gethostbyname
gethostbyname.argtypes = (c_char_p,)
gethostbyname.restype = hostentp

inet_addr = windll.Ws2_32.inet_addr
inet_addr.argtypes = (c_char_p,)
inet_addr.restype = c_ulong

inet_ntoa = windll.Ws2_32.inet_ntoa
inet_ntoa.argtypes = (in_addr,)
inet_ntoa.restype = c_char_p

htons = windll.Ws2_32.htons
htons.argtypes = (c_ushort,)
htons.restype = c_ushort
This should cover it, and the calling of bind should now be possible. A sockaddr_in structure needs to be passed to it, so the data needed to populate that structure needs to be located first.
hostdata = gethostbyname("")
ip = inet_ntoa(cast(hostdata.contents.h_addr_list, POINTER(in_addr)).contents)
Something is going wrong here.
ValueError: Procedure probably called with too many arguments (8 bytes in excess)
Some testing..
ia = cast(hostdata.contents.h_addr_list, POINTER(in_addr)).contents
print sizeof(ia)
print sizeof(in_addr)
This gives the following output.
12
12
So, whatever is going wrong is not obvious. This is already taking long enough, so I will stick with localhost as the ip address for now.
port = 10101
ip = "127.0.0.1"

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.S_addr = inet_addr(ip)
sa.sin_port = htons(port)

SOCKET_ERROR = -1

ret = bind(listenSocket, sockaddr_inp(sa), sizeof(sa))
if ret == SOCKET_ERROR:
raise WinError()
This works fine, now to try listening again.
ret = listen(listenSocket, SOMAXCONN)
if ret != 0:
raise WinError()
And this now works, Windows pops up the standard dialog window mentioning that Python has been blocked from accepting incoming connections and asks if I want to allow it. After allowing Python to accept incoming connections, port 10101 can now be telneted to, which demonstrates that the socket is now actually be listened to. The next step is to accept connections, which can wait until whenever I get the time to finish that code.

Next post: IOCP-based sockets with ctypes in Python: 3
Script source code: 01 - Listening.py

IOCP based sockets with ctypes in Python: 1

As I was reminded of recently, select in the Python library only handles 512 sockets at a time. This has given me the interest to spend a little time writing socket IO completion port support using ctypes.

In order to write code that uses C-level DLL functionality with ctypes, it requires that all structures, constants and functions are redefined at the Python level. This can be a somewhat tedious process, slowing down the progress that an interactive prompt otherwise allows. Developing an extension module in C or C++ would probably be faster, but then tradeoffs would be made in terms of framework structure and other inherent costs.

Defining a function is straightforward, given the obstacle of defining required but currently undefined structures. For instance, this is the prototype for AcceptEx:

BOOL AcceptEx(
__in SOCKET sListenSocket,
__in SOCKET sAcceptSocket,
__in PVOID lpOutputBuffer,
__in DWORD dwReceiveDataLength,
__in DWORD dwLocalAddressLength,
__in DWORD dwRemoteAddressLength,
__out LPDWORD lpdwBytesReceived,
__in LPOVERLAPPED lpOverlapped
);
In order to specify this function definition with ctypes, the OVERLAPPED structure needs to be defined first in this way.
class _US(Structure):
_fields_ = [
("Offset", DWORD),
("OffsetHigh", DWORD),
]

class _U(Union):
_fields_ = [
("s", _US),
("Pointer", c_void_p),
]

_anonymous_ = ("s",)

class OVERLAPPED(Structure):
_fields_ = [
("Internal", POINTER(ULONG)),
("InternalHigh", POINTER(ULONG)),

("u", _U),

("hEvent", HANDLE),

# Custom fields.
("channel", py_object),
]

_anonymous_ = ("u",)
I wrote that several years ago, and like most ctypes-based code, it took a reasonable amount of time. Interestingly, I've seen at least one other project has copied it verbatim (minus the Stackless 'channel' field) in order to avoid this effort. Given a definition for this structure, the function definition can also be written. The first step is to get a reference to the function from the DLL it is implemented within.
AcceptEx = windll.Mswsock.AcceptEx
Next the types of arguments it takes are specified.
AcceptEx.argtypes = (SOCKET, SOCKET, c_void_p, DWORD, DWORD, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
And finally, the type of the value it returns.
AcceptEx.restype = BOOL
At this point, AcceptEx is callable and can be used as needed. But to use it requires that the functions which create sockets also be defined, and whatever other functions you may need.

I often find myself writing all the definitions consecutively in a text editor. This is long and time consuming and you have no idea whether the code actually works. It is easy to make trivial mistakes like specifying the wrong DLL for the function to be obtained from. A much more rewarding approach in my experience is to write the code line by line in a interactive Python console, and once each aspect works, compose them into a script in a text editor. This script can be pasted into a new console when you resume the effort, next you find the time and interest.