Friday, 31 July 2009

IOCP-based sockets with ctypes in Python: 7

Previous post: IOCP-based sockets with ctypes in Python: 6

In the last attempt, I managed to get a minimal amount of standard socket code making use of the functionality Stackless Python provides, working within my script. But if I am to make a proper replacement socket module, I need to be able to use it in the same way stacklesssocket.py is used. To this end, the use case I want to support is Andrew Dalke's "more naturally usable" example script.

As can be seen in the script, the following code was the way the replacement socket module was put in place:

import stacklesssocket
sys.modules["socket"] = stacklesssocket
Back in Python 2.5 when it was being developed, the socket module was a lot simpler and the approach of making a complete replacement module was feasible. These days however, in later versions of Python 2 and also in Python 3, the socket module has become somewhat more complicated and the approach of leaving the standard version in place and substituting parts became practical. This lead to new code to substitute in that functionality:
import stacklesssocket
stacklesssocket.install()
The goal

Andrew's example script, the supporting of which is my goal, contains the following code:
import sys
import stacklesssocket
import stackless

stacklesssocket.install()

import urllib
import time

def download(uri):
t1 = time.time()
f = urllib.urlopen(uri)
s = f.read()
t2 = time.time()
print "Downloaded", uri, "in", "%.1f" % (t2-t1), "seconds"
return t2-t1


print " === Serial === "
t1 = time.time()
download("http://www.stackless.com/wiki/Tasklets")
download("http://www.stackless.com/wiki/Channels")
t2 = time.time()
print " --->", t2-t1

print " === Parallel === "
t1 = time.time()
stackless.tasklet(download)("http://www.stackless.com/wiki/Tasklets")
stackless.tasklet(download)("http://www.stackless.com/wiki/Channels")
stackless.run()
t2 = time.time()
print " --->", t2-t1
The most important aspect of this, is how it uses another higher level standard library module (urllib) that in turn uses the standard socket module. By using my replacement functionality in this way, I can verify how compatible and stable it might be.

The implementation

The original stacklesssocket.py module was based on asyncore, another framework that wraps select. Back when I wrote it, I was glad to have it there to use, but the fact is that adopting a framework brings in dependencies and abitrary abstraction. If I had the time and interest, I would rewrite stacklesssocket.py to use select directly.

At this time, I am developing for Stackless Python 2.6.2. Right off the bat, it was obvious that this new module was going to be somewhat simpler. There were several substitutions to the standard socket module for stacklesssocket.py, but in this case it looks like I just need to subsitute my own _realsocket object.
def install():
stdsocket._realsocket = socket
The first obstacle was my lack of a connect method on my replacement socket object. Checking out the available functions on MSDN, ConnectEx was the one which suited my overlapped IO needs. However, I was unable to find it exported from any Winsock related DLL. It turns out, you need to call WSAIoctl in order to get a pointer to it. I had been avoiding calling WSAIoctl as I assumed it was only used to make faster function calls, but as it is the gateway to functionality I need that is no longer something I can do.
int WSAIoctl(
__in SOCKET s,
__in DWORD dwIoControlCode,
__in LPVOID lpvInBuffer,
__in DWORD cbInBuffer,
__out LPVOID lpvOutBuffer,
__in DWORD cbOutBuffer,
__out LPDWORD lpcbBytesReturned,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
The corresponding ctypes definition is as follows:
WSAIoctl = windll.Ws2_32.WSAIoctl
WSAIoctl.argtypes = (SOCKET, DWORD, c_void_p, DWORD, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSAIoctl.restype = c_int
The appropriate dwIoControlCode value to get a function pointer is SIO_GET_EXTENSION_FUNCTION_POINTER. The value of lpvInBuffer to indicate that the pointer to ConnectEx is the one I want, is the following GUID.
{ 0x25a207b9,0xddf3,0x4660, { 0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e }}
I'm not doing this as an overlapped operation, as I can't see how it could end up blocking in any meaningful way. This resulted in the following code in the replacement sockets __init__ method:
        self.wsFnConnectEx = ConnectExFunc()
dwBytes = DWORD()
ret = WSAIoctl(_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, byref(WSAID_CONNECTEX), sizeof(WSAID_CONNECTEX), byref(self.wsFnConnectEx), sizeof(self.wsFnConnectEx), byref(dwBytes), cast(0, POINTER(OVERLAPPED)), 0)
if ret == SOCKET_ERROR:
err = WSAGetLastError()
closesocket(ret)
raise WinError(err)
With this out of the way, I can now move onto fleshing out the connect method.
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
c = ovConnect.channel = stackless.channel()

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
raise WinError()

activeIO[self._socket] = c
c.receive()
ConnectEx was not working on the first attempt, it turns out you need to bind a socket to a local address before you can connect using it. Oh well, time to retry Andrew's script and see how I go.

This time, the lack of a sendall method was preventing the script from running. sendall just continues calling send for the same set of data, until all of it has been sent.
    def sendall(self, msg, flags=None):
bytesSent = self.send(msg)
while bytesSent < len(msg):
bytesSent += self.send(msg[bytesSent:])
At this point I decided to take note of the fact that it is possible for operations that are performed with an overlapped object, may actually complete right away. However, my mistake was in assuming that if this happened, a completion packet wouldn't arrive through GetQueuedCompletionStatus. This led to crashes where I examined the channel attribute on arrived packets, where the overlapped object had already been garbage collected due to the exit of the function that started the operation.

In order to clean things up, I decided to store a reference to the overlapped object ina global variable and in addition move the channel reference I hold also outside and alongside of that overlapped object. I could have stored the socket handle in the overlapped object, but since CreateIOCompletionPort allows me to associate a completion key with the socket, I store it there instead.
        # Bind the socket to the shared IO completion port.
CreateIoCompletionPort(_socket, hIOCP, _socket, NULL)
The idea was that I could then ignore packets that arrived for earlier operations which had actually completed immediately. Of course, logic having gotten a result for an IO operation may try another which might this time block. This means that unwanted packets for the operations that had completed immediately would come in and assume they were for the followup blocking operation. Errors ensued. So each completed packet needs to be able to be matched up with the operation state stored for it.

This left connect in the following state:
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
opID = ovConnect.opID = self.__getnextopid()
c = stackless.channel()
c.preference = 0
ovConnect.label = "connect"

activeOps[(self._socket, opID)] = (c, ovConnect)

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
raise WinError()

c.receive()
The other overlapped IO operations also needed to be changed to match, along with _DispatchIOCP.
def _DispatchIOCP():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

def _GetCompletionChannel(completionKey, overlappedPtr):
_socket = completionKey.value
opID = ovCompletedPtr.contents.opID
k = _socket, opID
c, ovRef = activeOps[k]
del activeOps[k]

return c

while stackless.runcount > 2:
while True:
stackless.schedule()

c = None
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 50)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue

if not bool(ovCompletedPtr):
raise WinError(err)

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
c.send_exception(WinError, err)
continue

break

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
if c.balance == -1:
c.send(numberOfBytes.value)
One note is that accessing ovCompletedPtr.contents will error if the value is NULL, the safe way with ctypes to ascertain whether the value is NULL is to check if the boolean value is False.

Running Andrew's script at this point results in it exiting cleanly, but mid-execution. The reason for this is the condition stackless.runcount > 2. The theory is that this should keep the loop running as long as there are at least two tasklets in the scheduler, this is just plain wrong and ill thought out. The fact is that at any given time, at least for Andrew's script, the runcount is only likely to be one. The reason for this is that all the other tasklets doing IO will be outside of the scheduler, blocked on a channel. So the lowest possible valid value for runcount that will require continued looping will be one.

Taking this into account, I added a weakref dictionary to track the socket objects which were live and added it as an additional reason to loop. This in no way addresses the above problem, but it does the trick. Andrew's script now runs, and gives the following output:
 === Serial ===
Downloaded http://www.stackless.com/wiki/Tasklets in 3.4 seconds
Downloaded http://www.stackless.com/wiki/Channels in 3.9 seconds
---> 7.26900005341
=== Parallel ===
Downloaded http://www.stackless.com/wiki/Channels in 3.3 seconds
Downloaded http://www.stackless.com/wiki/Tasklets in 3.5 seconds
---> 3.51300001144
Success (of a sort)!

Of course the module in no way provides the full functionality of the standard version yet. Trivial aspects no-one in their right mind should be using, like UDP and so forth, are completely unsupported.

Script source code: 06 - Polished Socket.py

No comments:

Post a Comment