Saturday 1 August 2009

Merging Stackless Python 3.1

Releasing a new version of Stackless Python is never a straightforward experience. There are always a range of presumably avoidable problems that seem to pop up and slow the process down.

This time the problems encountered were:

  • Pre-existing failures in the Python test suite.
  • Changes in the standard Python pickling unit test approach.
  • Dynamic behaviour in the unittest.py script involving itself in the Stackless function pickling code.
  • TortoiseSVN giving a range of incomprehensible errors.
Pre-existing standard unit test failures

When there are failures in the Python unit tests in the Stackless build, a good step to tracking them down is to define STACKLESS_OFF and compile the Stackless source code into standard Python. If the tests fail there, then it might be a mismerge, or it might be that the tests fail in the standard Python branch that was merged from. After compiling and testing the standard Python branch and not seeing test failures there, that then points to a mismerge.

In this case, the same tests failed to pass in the standard Python branch. While these false negatives increase the time and effort required to do the merge, they are of course better than having mismerged. They can also be ignored, as the goal of Stackless is to be backwards compatible with standard Python...

The first Python unit test failure was in the distutils tests. It looks like the exported symbol prefix for module initialisation has changed from init to PyInit_, but a unit test had not been changed to generate a method with the new prefix.
test_distutils
xxmodule.c
Creating library c:\users\richard\appdata\local\temp\tmpbc9aj_\Debug\users\richard\appdata\local\temp\tmpbc9aj_\xx_d.lib and object c:\users\richard\appdata\local\temp\tmpbc9aj_\Debug\users\richard\appdata\local\temp\tmpbc9aj_\xx_d.exp
foo.c
LINK : error LNK2001: unresolved external symbol PyInit_foo
c:\users\richard\appdata\local\temp\tmp6u2yky\tempt\users\richard\appdata\local\temp\tmpa9gpqp\foo_d.lib : fatal error LNK1120: 1 unresolved externals
[37422 refs]

D:\SVN\_python\python-branches\py3k-export\py3k>exit 1

D:\SVN\_python\python-branches\py3k-export\py3k>exit 0
test test_distutils failed -- Traceback (most recent call last):
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\msvc9compiler.py", line 635, in link
self.spawn([self.linker] + ld_args)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\ccompiler.py", line 981, in spawn
spawn(cmd, dry_run=self.dry_run)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\spawn.py", line 36, in spawn
_spawn_nt(cmd, search_path, dry_run=dry_run)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\spawn.py", line 77, in _spawn_nt
"command '%s' failed with exit status %d" % (cmd[0], rc))
distutils.errors.DistutilsExecError: command '"c:\Program Files\Microsoft Visual Studio 9.0\VC\BIN\link.exe"' failed with exit status 1120

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\tests\test_build_ext.py", line 320, in test_get_outputs
cmd.run()
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\command\build_ext.py", line 347, in run
self.build_extensions()
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\command\build_ext.py", line 456, in build_extensions
self.build_extension(ext)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\command\build_ext.py", line 543, in build_extension
target_lang=language)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\ccompiler.py", line 791, in link_shared_object
extra_preargs, extra_postargs, build_temp, target_lang)
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\distutils\msvc9compiler.py", line 637, in link
raise LinkError(msg)
distutils.errors.LinkError: command '"c:\Program Files\Microsoft Visual Studio 9.0\VC\BIN\link.exe"' failed with exit status 1120
The second unit test failure was within the socket unit tests, something about binding them.
test_socket
test test_socket failed -- Traceback (most recent call last):
File "D:\SVN\_python\python-branches\py3k-export\py3k\lib\test\test_socket.py", line 498, in testSockName
sock.bind(("0.0.0.0", port))
socket.error: [Errno 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Legitimate standard unit test failures

Recompiling the merged source code with Stackless enabled showed an additional unit test failure in test_pickle.py. Stackless allows pickling of running code and in order to do this, it alters what gets pickled. In this case, the fix was simply pickling the problematic object, taking the result and and updating the unit test to include it. Of course, the case where Stackless is disabled still needs to be handled.
try:
import stackless
DATA4 = b'\x80\x02cstackless._wrap\nrange\nq\x00K\x00K\x05K\x01\x87q\x01Rq\x02)b.'
except:
DATA4 = b'\x80\x02c__builtin__\nxrange\nq\x00K\x00K\x05K\x01\x87q\x01Rq\x02.'
Stackless pickling unit test failure

There was one failure in the Stackless unit tests, to do with pickling a recursive function. Editing the test and reconciling what comes out of pickletools.dis with the test failure stack trace made it clear what the problem was. It was dynamic behaviour in the unittest.py module, because there were self references in the recursive function being pickled, the test suite instance was being included with the pickled function. On unpickling, a __getattr__ hook in the _WritelnDecorator text UI test running support class was entering an infinite loop. The correct fix here was to remove the self reference to stop the test case instance being dragged into the pickle.

TortoiseSVN problems

It used to be that TortoiseSVN just worked. I'd go through the trauma of merging a Python branch and building an installer, and then I could rely on it to do its job. But these days, every time I do a merge, it seems that TortoiseSVN gets flakier and flakier.

When I went to commit the merged Stackless code into our py3k branch, TortoiseSVN complained about how a new file from the original Python branch was already present. I tried to do a "clean up" on the relevant directory, which did not work. I tried to exclude the relevant directory from the commit operation, which resulted in another unclear and apparently irrelevant error to do with recursive commits needing to be done a certain way. Eventually, what worked was reverting the directory and readding it.

With the merged changes committed, there was one more code change to do. The configure script is a unix script and is generated from another file, configure.in. It does not merge well, so the best approach is to not merge it and once all the other merges are checked in, fetch the branch on a Linux machine and rebuild configure there. Then I download the regenerated file to my local machine via the web, and commit it locally. Normally is as straightforward as it sounds and just works, but this time, TortoiseSVN complained about inconsistent line endings. The file has the subversion property of native line endings, and in the past it was just accepted, but not any more. What worked was loading the file up in a decent editor that allowed me to save it out in "unix" format.

Doing the release

I am not releasing Stackless Python 3.1 yet. In order to allow extensions to be compatible with Python builds, there is an official version of Visual Studio that needs to be used for both of these things. And I do not have access to an install of Visual Studio 2008 at this time. At some later stage, I might have the time and interest to download Visual C++ Express 2008, and build the relevant binaries. But I cannot guarantee that the express version of Visual Studio can build installers anyway, which might make it pointless.

So, at this time, Stackless Python 3.1 is not officially released.

Friday 31 July 2009

IOCP-based sockets with ctypes in Python: 7

Previous post: IOCP-based sockets with ctypes in Python: 6

In the last attempt, I managed to get a minimal amount of standard socket code making use of the functionality Stackless Python provides, working within my script. But if I am to make a proper replacement socket module, I need to be able to use it in the same way stacklesssocket.py is used. To this end, the use case I want to support is Andrew Dalke's "more naturally usable" example script.

As can be seen in the script, the following code was the way the replacement socket module was put in place:

import stacklesssocket
sys.modules["socket"] = stacklesssocket
Back in Python 2.5 when it was being developed, the socket module was a lot simpler and the approach of making a complete replacement module was feasible. These days however, in later versions of Python 2 and also in Python 3, the socket module has become somewhat more complicated and the approach of leaving the standard version in place and substituting parts became practical. This lead to new code to substitute in that functionality:
import stacklesssocket
stacklesssocket.install()
The goal

Andrew's example script, the supporting of which is my goal, contains the following code:
import sys
import stacklesssocket
import stackless

stacklesssocket.install()

import urllib
import time

def download(uri):
t1 = time.time()
f = urllib.urlopen(uri)
s = f.read()
t2 = time.time()
print "Downloaded", uri, "in", "%.1f" % (t2-t1), "seconds"
return t2-t1


print " === Serial === "
t1 = time.time()
download("http://www.stackless.com/wiki/Tasklets")
download("http://www.stackless.com/wiki/Channels")
t2 = time.time()
print " --->", t2-t1

print " === Parallel === "
t1 = time.time()
stackless.tasklet(download)("http://www.stackless.com/wiki/Tasklets")
stackless.tasklet(download)("http://www.stackless.com/wiki/Channels")
stackless.run()
t2 = time.time()
print " --->", t2-t1
The most important aspect of this, is how it uses another higher level standard library module (urllib) that in turn uses the standard socket module. By using my replacement functionality in this way, I can verify how compatible and stable it might be.

The implementation

The original stacklesssocket.py module was based on asyncore, another framework that wraps select. Back when I wrote it, I was glad to have it there to use, but the fact is that adopting a framework brings in dependencies and abitrary abstraction. If I had the time and interest, I would rewrite stacklesssocket.py to use select directly.

At this time, I am developing for Stackless Python 2.6.2. Right off the bat, it was obvious that this new module was going to be somewhat simpler. There were several substitutions to the standard socket module for stacklesssocket.py, but in this case it looks like I just need to subsitute my own _realsocket object.
def install():
stdsocket._realsocket = socket
The first obstacle was my lack of a connect method on my replacement socket object. Checking out the available functions on MSDN, ConnectEx was the one which suited my overlapped IO needs. However, I was unable to find it exported from any Winsock related DLL. It turns out, you need to call WSAIoctl in order to get a pointer to it. I had been avoiding calling WSAIoctl as I assumed it was only used to make faster function calls, but as it is the gateway to functionality I need that is no longer something I can do.
int WSAIoctl(
__in SOCKET s,
__in DWORD dwIoControlCode,
__in LPVOID lpvInBuffer,
__in DWORD cbInBuffer,
__out LPVOID lpvOutBuffer,
__in DWORD cbOutBuffer,
__out LPDWORD lpcbBytesReturned,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
The corresponding ctypes definition is as follows:
WSAIoctl = windll.Ws2_32.WSAIoctl
WSAIoctl.argtypes = (SOCKET, DWORD, c_void_p, DWORD, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSAIoctl.restype = c_int
The appropriate dwIoControlCode value to get a function pointer is SIO_GET_EXTENSION_FUNCTION_POINTER. The value of lpvInBuffer to indicate that the pointer to ConnectEx is the one I want, is the following GUID.
{ 0x25a207b9,0xddf3,0x4660, { 0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e }}
I'm not doing this as an overlapped operation, as I can't see how it could end up blocking in any meaningful way. This resulted in the following code in the replacement sockets __init__ method:
        self.wsFnConnectEx = ConnectExFunc()
dwBytes = DWORD()
ret = WSAIoctl(_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, byref(WSAID_CONNECTEX), sizeof(WSAID_CONNECTEX), byref(self.wsFnConnectEx), sizeof(self.wsFnConnectEx), byref(dwBytes), cast(0, POINTER(OVERLAPPED)), 0)
if ret == SOCKET_ERROR:
err = WSAGetLastError()
closesocket(ret)
raise WinError(err)
With this out of the way, I can now move onto fleshing out the connect method.
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
c = ovConnect.channel = stackless.channel()

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
raise WinError()

activeIO[self._socket] = c
c.receive()
ConnectEx was not working on the first attempt, it turns out you need to bind a socket to a local address before you can connect using it. Oh well, time to retry Andrew's script and see how I go.

This time, the lack of a sendall method was preventing the script from running. sendall just continues calling send for the same set of data, until all of it has been sent.
    def sendall(self, msg, flags=None):
bytesSent = self.send(msg)
while bytesSent < len(msg):
bytesSent += self.send(msg[bytesSent:])
At this point I decided to take note of the fact that it is possible for operations that are performed with an overlapped object, may actually complete right away. However, my mistake was in assuming that if this happened, a completion packet wouldn't arrive through GetQueuedCompletionStatus. This led to crashes where I examined the channel attribute on arrived packets, where the overlapped object had already been garbage collected due to the exit of the function that started the operation.

In order to clean things up, I decided to store a reference to the overlapped object ina global variable and in addition move the channel reference I hold also outside and alongside of that overlapped object. I could have stored the socket handle in the overlapped object, but since CreateIOCompletionPort allows me to associate a completion key with the socket, I store it there instead.
        # Bind the socket to the shared IO completion port.
CreateIoCompletionPort(_socket, hIOCP, _socket, NULL)
The idea was that I could then ignore packets that arrived for earlier operations which had actually completed immediately. Of course, logic having gotten a result for an IO operation may try another which might this time block. This means that unwanted packets for the operations that had completed immediately would come in and assume they were for the followup blocking operation. Errors ensued. So each completed packet needs to be able to be matched up with the operation state stored for it.

This left connect in the following state:
    def connect(self, address):
host, port = address

self.bind(("0.0.0.0", 0))

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

bytesSent = DWORD(0)
ovConnect = OVERLAPPED()
opID = ovConnect.opID = self.__getnextopid()
c = stackless.channel()
c.preference = 0
ovConnect.label = "connect"

activeOps[(self._socket, opID)] = (c, ovConnect)

ret = self.wsFnConnectEx(self._socket, sa, sizeof(sa), 0, 0, NULL, byref(ovConnect))
if ret == FALSE:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
raise WinError()

c.receive()
The other overlapped IO operations also needed to be changed to match, along with _DispatchIOCP.
def _DispatchIOCP():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

def _GetCompletionChannel(completionKey, overlappedPtr):
_socket = completionKey.value
opID = ovCompletedPtr.contents.opID
k = _socket, opID
c, ovRef = activeOps[k]
del activeOps[k]

return c

while stackless.runcount > 2:
while True:
stackless.schedule()

c = None
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 50)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue

if not bool(ovCompletedPtr):
raise WinError(err)

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
c.send_exception(WinError, err)
continue

break

c = _GetCompletionChannel(completionKey, ovCompletedPtr)
if c.balance == -1:
c.send(numberOfBytes.value)
One note is that accessing ovCompletedPtr.contents will error if the value is NULL, the safe way with ctypes to ascertain whether the value is NULL is to check if the boolean value is False.

Running Andrew's script at this point results in it exiting cleanly, but mid-execution. The reason for this is the condition stackless.runcount > 2. The theory is that this should keep the loop running as long as there are at least two tasklets in the scheduler, this is just plain wrong and ill thought out. The fact is that at any given time, at least for Andrew's script, the runcount is only likely to be one. The reason for this is that all the other tasklets doing IO will be outside of the scheduler, blocked on a channel. So the lowest possible valid value for runcount that will require continued looping will be one.

Taking this into account, I added a weakref dictionary to track the socket objects which were live and added it as an additional reason to loop. This in no way addresses the above problem, but it does the trick. Andrew's script now runs, and gives the following output:
 === Serial ===
Downloaded http://www.stackless.com/wiki/Tasklets in 3.4 seconds
Downloaded http://www.stackless.com/wiki/Channels in 3.9 seconds
---> 7.26900005341
=== Parallel ===
Downloaded http://www.stackless.com/wiki/Channels in 3.3 seconds
Downloaded http://www.stackless.com/wiki/Tasklets in 3.5 seconds
---> 3.51300001144
Success (of a sort)!

Of course the module in no way provides the full functionality of the standard version yet. Trivial aspects no-one in their right mind should be using, like UDP and so forth, are completely unsupported.

Script source code: 06 - Polished Socket.py

Sims bakery: Lambs fry and bacon pie

Driving past Sims bakery this morning, I bought several interesting things. I always keep an eye out for interesting pies, and lambs fry and bacon sounded pretty tasty.

2009-07-31 - Sims bakery - 01 - Lambs fry and bacon pie

The inside is pretty disappointing and there is very little bacon or lambs fry. Most of the pie was a thick glutinous translucent "gravy" which was pretty tasteless.

2009-07-31 - Sims bakery - 02 - Lambs fry and bacon pie innards

Curing bacon: Day 8

Previous post: Curing bacon: Day 6

Yesterday, the pork was turned and had more of the salt/sugar mix applied again. Today, it had sat long enough and it was time to smoke it.

The first step was to wash off the sugar. The idea is that the sugar coating, if left on the pork, would cause it to go off sooner. That, and it probably would not taste as nice. So both pieces of pork were washed in a bucket of warm water and scrubbed to remove the sugar. Then they were hung outside to drip dry, so that they wouldn't drip inside the garage when they were hung there.

2009-07-30 - Curing bacon - 01 - Drip drying

After having hung outside for around ten minutes, they were brought inside and hung in front of a large fan to be blown dry. The smaller piece turned in the wind, but the larger piece didn't and had to be hand turned after several hours. After several more hours, they were left to hang with the fan turned off overnight.

2009-07-30 - Curing bacon - 02 - Fan drying

The next morning, being dry, they were ready to be taken down.

2009-07-31 - Curing bacon - 01 - Fan dried

They were both then hung in the smoker and left for several hours.

2009-07-31 - Curing bacon - 02 - Smoking

After which, they were done.

2009-07-31 - Curing bacon - 03 - Smoked

And stored away in the fridge to sit for a few days, before being tried.

2009-07-31 - Curing bacon - 04 - Done

Tuesday 28 July 2009

Curing bacon: Day 6

Previous post: Curing bacon: Day 5

At this state, the pork had been sitting in the "Honey-Glo" cure liquid for two days, having been turned over after the first day.

2009-07-28 - Curing bacon - 01 - Current state

I poured the liquid down the drain, to get it ready to be given another coating of the salt/sugar rubbing mix. The meat definitely looks a more palatable red colour after soaking in the liquid.

2009-07-28 - Curing bacon - 02 - Drained

This is how the pork is looking after the salt/sugar rub.

2009-07-28 - Curing bacon - 03 - Salt-sugar rub

Next post: Curing bacon: Day 8

IOCP-based sockets with ctypes in Python: 6

Previous post: IOCP-based sockets with ctypes in Python: 5

The goal of this step is to implement a partial amount of a Stackless-compatible socket module that provides the basic functionality the standard one has. It should be usable to provide blocking socket IO within Stackless tasklets.

The most straightforward approach is to base the code on how it was done with stacklesssocket.py. In stacklesssocket.py, a tasklet is launched to poll asyncore.

managerRunning = False

def ManageSockets():
global managerRunning

while len(asyncore.socket_map):
# Check the sockets for activity.
asyncore.poll(0.05)
# Yield to give other tasklets a chance to be scheduled.
stackless.schedule()

managerRunning = False
There I was looping and polling asyncore. I can easily substitute in the polling I already have built up with GetQueuedCompletionStatus.
managerRunning = False

def ManageSockets():
global managerRunning

wsaData = WSADATA()
ret = WSAStartup(MAKEWORD(2, 2), LP_WSADATA(wsaData))
if ret != 0:
raise WinError(ret)

hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL_HANDLE, NULL, NULL)
if hIOCP == 0:
WSACleanup()
raise WinError()

numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while True:
while True:
# Yield to give other tasklets a chance to be scheduled.
stackless.schedule()

ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 50)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue

ovCompletedPtr.contents.channel.send_exception(WinError, err)
continue

break

# Handle the completed packet.
ovCompletedPtr.contents.channel.send(numberOfBytes.value)

managerRunning = False
There are several things to note here.

As in the asyncore polling, I regularly call stackless.schedule() in order to yield to the scheduler and allow any other tasklets that might be present within it to get a chance to execute.

The custom attribute channel in the OVERLAPPED structure is expected to have had a stackless.channel instance assigned to it. In the case of an error, I can wake up the tasklet that initiated the asynchronous IO and raise an exception on it. In the case of success, I can return important information to the logic it invoked to trigger that IO, so that it can more easily handle the success case. The only piece of information I can see being of use is numberOfBytes, as that saves me from having to query it manually using WSAGetOverlappedResult.

Now I need to make a socket class that is compatible with the standard Python one.
class socket:
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=IPPROTO_TCP):
ret = WSASocket(family, type, proto, None, 0, WSA_FLAG_OVERLAPPED)
if ret == INVALID_SOCKET:
raise WinError()

# Bind the socket to the shared IO completion port.
CreateIoCompletionPort(ret, hIOCP, NULL, NULL)

self._socket = ret
That gives me a fully set up socket object, on instantiation. Refactoring send, recv, accept and bind methods onto it from the code I have already written, should be similarly as straightforward.

Starting with send:
    def send(self, data):
self.sendBuffer[0].buf = data
self.sendBuffer[0].len = len(data)

bytesSent = DWORD()
ovSend = OVERLAPPED()
c = ovSend.channel = stackless.channel()

ret = WSASend(self._socket, cast(self.sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

# Return the number of bytes that were send.
return c.receive()
This was a pretty direct conversion. The only additions were the setting of the channel attribute, blocking on the channel until we get returned the number of bytes sent or an except raised through it and returning the number of bytes sent to the caller.

Next is the recv method:
    def recv(self, byteCount, flags=0):
if self.recvBuffer is None:
self.recvBuffer = (WSABUF * 1)()
self.recvBuffer[0].buf = ' ' * READ_BUFFER_SIZE
self.recvBuffer[0].len = READ_BUFFER_SIZE

# WARNING: For now, we cap the readable amount to size of the preallocated buffer.
byteCount = min(byteCount, READ_BUFFER_SIZE)

numberOfBytesRecvd = DWORD()
flags = DWORD()
ovRecv = OVERLAPPED()
c = ovRecv.channel = stackless.channel()

ret = WSARecv(self._socket, cast(self.recvBuffer, POINTER(WSABUF)), 1, byref(numberOfBytesRecvd), byref(flags), byref(ovRecv), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
raise WinError(err)

# Block until the overlapped operation completes.
numberOfBytes = c.receive()
return self.recvBuffer[0].buf[:numberOfBytes]
This was also straightforward. Again, a channel was provided and blocked upon. A received data buffer is created on the first call and reused on subsequent calls, and the amount of data that is allowed to be received is capped to the size of that buffer. On success and a returned number of bytes received, the appropriate segment of the data buffer is sliced and returned to the caller.

In the event that the remote connection disconnected, the channel would have indicated that 0 bytes were received and this would have resulted in an empty string being returned. This is fine, as it is the right way to indicate a socket has disconnected on a recv call.

bind is extremely straightforward, so that came next:
    def bind(self, address):
host, port = address

sa = sockaddr_in()
sa.sin_family = AF_INET
sa.sin_addr.s_addr = inet_addr(host)
sa.sin_port = htons(port)

ret = bind(self._socket, sockaddr_inp(sa), sizeof(sa))
if ret == SOCKET_ERROR:
raise WinError()
As is listen:
    def listen(self, backlog):
ret = listen(self._socket, backlog)
if ret != 0:
raise WinError()
A little more complex is accept:
    def accept(self):
dwReceiveDataLength = 0
dwLocalAddressLength = sizeof(sockaddr_in) + 16
dwRemoteAddressLength = sizeof(sockaddr_in) + 16
outputBuffer = create_string_buffer(dwReceiveDataLength + dwLocalAddressLength + dwRemoteAddressLength)

dwBytesReceived = DWORD()
ovAccept = OVERLAPPED()
c = ovAccept.channel = stackless.channel()

acceptSocket = socket()

ret = AcceptEx(self._socket, acceptSocket._socket, outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(dwBytesReceived), byref(ovAccept))
if ret == FALSE:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
closesocket(acceptSocket._socket)
raise WinError(err)

# Block until the overlapped operation completes.
c.receive()

localSockaddr = sockaddr_in()
localSockaddrSize = c_int(sizeof(sockaddr_in))
remoteSockaddr = sockaddr_in()
remoteSockaddrSize = c_int(sizeof(sockaddr_in))

GetAcceptExSockaddrs(outputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, byref(localSockaddr), byref(localSockaddrSize), byref(remoteSockaddr), byref(remoteSockaddrSize))

hostbuf = create_string_buffer(NI_MAXHOST)
servbuf = c_char_p()

port = ntohs(localSockaddr.sin_port)

localSockaddr.sin_family = AF_INET
ret = getnameinfo(localSockaddr, sizeof(sockaddr_in), hostbuf, sizeof(hostbuf), servbuf, 0, NI_NUMERICHOST)
if ret != 0:
err = WSAGetLastError()
closesocket(acceptSocket._socket)
raise WinError(err)

# host = inet_ntoa(localSockaddr.sin_addr)

return (acceptSocket, (hostbuf.value, port))
This was refactored in much the same way as the other asynchronous methods (send and recv), with a channel to block for the result upon. However it got a little more complex when it came to providing the expected address part of the return value. When I connect to a standard Python socket listening on my local machine, with a telnet connection from the same machine, I see the host as the standard localhost address (127.0.0.1). However, this did not give the same result:
host = inet_ntoa(localSockaddr.sin_addr)
Instead it would give the address 0.0.0.0. So, I looked at how the low-level Python C source code did it, and saw it was calling getnameinfo. My next attempt based on that, was the following code:
        localSockaddr.sin_family = AF_INET
ret = getnameinfo(localSockaddr, sizeof(sockaddr_in), hostbuf, sizeof(hostbuf), servbuf, 0, NI_NUMERICHOST)
if ret != 0:
err = WSAGetLastError()
closesocket(acceptSocket._socket)
raise WinError(err)
However, this gives the same host address. For now, you can see both in the accept logic. I have to assume that this is just some idiosyncracy that does not really matter.

One thing I did notice though, was that my in_addr structure was not working correctly. It turns out that I had defined the sockaddr_in structure incorrectly.

Instead of:
class _UN(Structure):
_fields_ = [
("s_un_b", _UN_b),
("s_un_w", _UN_w),
("s_addr", c_ulong),
]

class in_addr(Union):
_fields_ = [
("s_un", _UN),
]
_anonymous_ = ("s_un",)
I should have had:
class in_addr(Union):
_fields_ = [
("s_un_b", _UN_b),
("s_un_w", _UN_w),
("s_addr", c_ulong),
]
As a foreign function interface solution, ctypes is extremely impressive and allows you to do pretty much anything you want. Union structures however, seem to be strangely hard to get right. With this change, now the in_addr field of sockaddr_in is working correctly, although it did not fix my host address problem.

This completes the basic range of socket functionality. A good next step from here is to write a simple echo server, with standard Python blocking socket usage within Stackless Python.
def Run():
address = ("127.0.0.1", 3000)
listenSocket = socket(AF_INET, SOCK_STREAM)
listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
listenSocket.bind(address)
listenSocket.listen(5)

def handle_echo(_socket, _address):
while True:
data = currentSocket.recv(256)
if data == "":
print _address, "DISCONNECTED"
return

print _address, "READ", data, len(data)
dlen = currentSocket.send(data)
print _address, "ECHOD", dlen

while True:
print "Waiting for new connection"
currentSocket, clientAddress = listenSocket.accept()
print "Connection", currentSocket.fileno(), "from", clientAddress

stackless.tasklet(handle_echo)(currentSocket, clientAddress)

if __name__ == "__main__":
StartManager()

print "STARTED"
stackless.tasklet(Run)()
stackless.run()
print "EXITED"
This works perfectly, however there is one problem. When control-c is pressed, the Python interpreter will be killed due to an access violation or some such thing.

A reasonable assumption would be that Stackless tasklets blocked on IOCP related actions are not being cleaned up properly. In order to make sure the required clean up happens, I wrapped the main polling loop with suitable logic.
    try:
# Call the polling loop.
finally:
_CleanupActiveIO()
CloseHandle(hIOCP)
WSACleanup()
The definition of _CleanupActiveIO was as follows:
def _CleanupActiveIO():
for k, v in activeIO.items():
ret = CancelIo(k)
if ret == 0:
raise WinError()

# Any tasklets blocked on IO are killed silently.
v.send_exception(TaskletExit)
Raising a real exception on the blocked tasklets, causes the interpreter to exit on the given tasklet with that exception. I want them to be killed silently, having cleaned up properly and for the exception on the main tasklet to be the one that causes the interpreter to exit and be displayed.

In order for the cleanup method to know about the channels in use and tasklets blocked on them, any method that started asynchronous IO had the following line added before its channel receive call.
        activeIO[self._socket] = c
CancelIO is of course a Windows function.
BOOL WINAPI CancelIo(
__in HANDLE hFile
);
With the matching ctypes definition:
CancelIo = windll.kernel32.CancelIo
CancelIo.argtypes = (HANDLE,)
CancelIo.restype = BOOL
Next steps

At this point, I have a socket object that can be used with Stackless Python to do synchronous IO that is actually transparently doing asynchronous IO using IO completion ports behind the scenes. It still needs to be fleshed out into a module that can be monkey-patched in place of the standard socket module, as the stacklesssocket.py module can.

Next post: IOCP-based sockets with ctypes in Python: 7
Script source code: 05 - Socket.py

Monday 27 July 2009

Curing bacon: Day 5

Previous post: Curing bacon: Day 3

Didn't get around to doing this yesterday. I also didn't bother taking a picture today. As the curing liquid only covered half the pork, today I turned over the meat ensuring the piece that was on top was now on the bottom. Tomorrow, the liquid will be drained and the meat rerubbed with the salt/sugar mix.

Next post: Curing bacon: Day 6

Sunday 26 July 2009

Curing bacon: Day 3

Previous post: Curing bacon: Day 2

In order to ensure the meat has an appealing red colour, and is not a greyish one, we're injecting a liquid solution into the meat that gives it that colour and ensures that the curing process happens in the thicker parts of the meat.

Weighing an appropriate amount of the "Honey-Glo cure" powder.

2009-07-25 - Curing bacon - 01 - Colouring cure

The powder is then added to warm water, and stirred until it is dissolved. Then salt is repeatedly added and stirred until the water has 55% salinity.

2009-07-25 - Curing bacon - 02 - Colouring liquid

This is the current state of the pork after two days of curing. The lighting is unfortunately a little dark. The accrued liquid was poured down the drain, as it was going to be replaced by the "Honey-glo Cure" liquid.

2009-07-25 - Curing bacon - 03 - Current state

Another piece of pork turned up, in this case it is wild pork, hunted somewhere up in the hills.

2009-07-25 - Curing bacon - 04 - Wild pork

The curing liquid is injected into both pieces of meat. The liquid was poured from the plastic jug into a butcher's injection doodad. Then the doodad was sealed and air pumped in to get the pressure up so that the liquid would get pumped out.

The meat is penetrated in a lot of places to ensure that the liquid is present throughout the inside of the meat. I didn't get a photo of it in action, but the injection spike has holes down the sides, meaning liquid gets squirted sideways in all directions inside the meat - not forward.

2009-07-25 - Curing bacon - 05 - Injecting colouring cure

Injecting in one of the many other locations.

2009-07-25 - Curing bacon - 06 - Injecting colouring cure

With both pieces of meat injected with the colouring liquid, the remaining liquid is poured over them in the container to sit for a day.

2009-07-25 - Curing bacon - 07 - Injected result

There's some plan to shuffle them round tomorrow, so they both get to soak in the liquid.

Next post: Curing bacon: Day 5

Curing bacon: Day 2

Previous post: Curing bacon: Day 1

We're being lazy today and just taking the pork out of the fridge, rubbing the salt/sugar mix onto both sides again and then putting it back into the fridge turned over.

This is the pork after a day of curing.

2009-07-24 - Curing bacon - 01 - Current state

And this is it with salt rubbed in and turned over, ready to go back into the fridge.

2009-07-24 - Curing bacon - 02 - Turned over and mix rubbed

Next post: Curing bacon: Day 3

IOCP-based sockets with ctypes in Python: 5

Previous post: IOCP-based sockets with ctypes in Python: 4

The goal this time is to write an echo server, which will drive the addition of handling receiving data from connected sockets and a more general polling loop.

In order to keep track of the current accept socket and any other connected sockets, I need a dictionary to store the relevant state. The intention is that the key will be the completion key and the value will consist of an arbitrary number of elements, the first two of which are the current state ID and then the socket.

stateByKey = {}
I will only be looking up values for unknown completion keys in the dictionary, for two states, reading and writing.
STATE_WRITING = 1
STATE_READING = 2
I can identify incoming connections being assigned to the accept socket by the LISTEN_COMPLETION_KEY. I create an initial accept socket ready for use when the polling starts.
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()
After which the main loop of the script can pump the polling for completed packets.
try:
while Pump():
pass
print "*** Unexpected exit."
except KeyboardInterrupt:
print "*** Keyboard interrupt."
Cleanup()
The Pump function needs to be able to start an overlapped WSARecv operation on a newly connected socket. When it receives the completed packet for that operation, it needs to start an overlapped WSASend operation to send back what it received on the same socket. It also needs to start another overlapped WSARecv operation when the wrote completes so it can repeat the process.

With regard to the overlapped WSASend operation, I've already defined and used that function in a previous post, so I can fetch and repurpose the relevant logic.

The idea is that the function gets told the socket to send to, what message to send and returns anything that needs to be stored in the state dictionary. The expected return values of the state ID and socket are there, but the reference to the overlapped object is also returned and stored so that it does not get garbage collected as mentioned in the previous post.
def StartOverlappedWrite(_socket, msg):
sendBuffer = (WSABUF * 1)()
sendBuffer[0].buf = msg
sendBuffer[0].len = len(msg)

bytesSent = DWORD()
ovSend = OVERLAPPED()

ret = WSASend(_socket, cast(sendBuffer, POINTER(WSABUF)), 1, byref(bytesSent), 0, byref(ovSend), 0)
if ret != 0:
err = WSAGetLastError()
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_WRITING, _socket, ovSend
The general use of this function is in the form:
stateByKey[completionKeyValue] = StartOverlappedRead(stateData[1])
In order to call WSARecv, I first need to define it with ctypes.
int WSARecv(
__in SOCKET s,
__inout LPWSABUF lpBuffers,
__in DWORD dwBufferCount,
__out LPDWORD lpNumberOfBytesRecvd,
__inout LPDWORD lpFlags,
__in LPWSAOVERLAPPED lpOverlapped,
__in LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
With a straightword ctypes definition.
WSARecv = windll.Ws2_32.WSARecv
WSARecv.argtypes = (SOCKET, POINTER(WSABUF), DWORD, POINTER(DWORD), POINTER(DWORD), POINTER(OVERLAPPED), c_void_p)
WSARecv.restype = c_int
Using it is almost identical to the use of WSASend. The only difference is that I need a buffer for incoming data to be placed into, and I need to return that buffer so that I can see what arrived when the completed packet arrives.
def StartOverlappedRead(_socket):
recvBuffer = (WSABUF * 1)()
recvBuffer[0].buf = ' ' * 4096
recvBuffer[0].len = 4096
ovRecv = OVERLAPPED()

numberOfBytesRecvd = DWORD()
flags = DWORD()
ret = WSARecv(_socket, cast(recvBuffer, POINTER(WSABUF)), 1, byref(numberOfBytesRecvd), byref(flags), byref(ovRecv), 0)
if ret != 0:
err = WSAGetLastError()
# The operation was successful and is currently in progress. Ignore this error...
if err != ERROR_IO_PENDING:
Cleanup()
raise WinError(err)

return STATE_READING, _socket, recvBuffer, ovRecv
Now this can all be linked into the polling for completed packets.

The special case outside of the echo handling of connected sockets, is the initial handling of newly connected sockets. Here I start an overlapped read operation on the newly connected socket and create a new accept socket for the next incoming connection.

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)
When a write operation completes a new read operation is started for the socket.
if stateByKey[completionKey.value][0] == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateByKey[completionKey.value][1])
And when a read operation completes, I see how many bytes were read, and write that many bytes from the stored read buffer out to the relevant socket. If 0 bytes were read, this signals that the socket has been disconnected, and it can be forgotten about.

numberOfBytes is an output from the call to GetQueuedCompletionStatus which would be above this code.
if stateByKey[completionKey.value][0] == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateByKey[completionKey.value][1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
del stateByKey[completionKey.value]
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ recvBuffer[0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(_socket, msg)
Putting this all together and massaging it a little, gives the following Pump function:
def Pump():
numberOfBytes = DWORD()
completionKey = c_ulong()
ovCompletedPtr = POINTER(OVERLAPPED)()

while True:
ret = GetQueuedCompletionStatus(hIOCP, byref(numberOfBytes), byref(completionKey), byref(ovCompletedPtr), 500)
if ret == FALSE:
err = WSAGetLastError()
if err == WAIT_TIMEOUT:
continue
Cleanup()
raise WinError(err)
break

if completionKey.value == LISTEN_COMPLETION_KEY:
acceptKey, acceptSocket, ignore = stateByKey[LISTEN_COMPLETION_KEY]
stateByKey[LISTEN_COMPLETION_KEY] = CreateAcceptSocket()

# Do an initial read event on the newly connected socket.
stateByKey[acceptKey] = StartOverlappedRead(acceptSocket)

print "CONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
else:
stateData = stateByKey[completionKey.value]
del stateByKey[completionKey.value]

state = stateData[0]
if state == STATE_WRITING:
# We'll use the completion of the write to start the next read.
stateByKey[completionKey.value] = StartOverlappedRead(stateData[1])
elif state == STATE_READING:
# We'll use the completion of the read to do the corresponding write.
_socket, recvBuffer, ovRecv = stateData[1:]

# No received bytes indicates the connection has disconnected.
if numberOfBytes.value == 0:
print "DISCONNECTION;", len(stateByKey), "SOCKETS REGISTERED"
return True

msg = "["+ stateData[2][0].buf[:numberOfBytes.value] +"]"
stateByKey[completionKey.value] = StartOverlappedWrite(stateData[1], msg)
else:
Cleanup()
raise Exception("Unexpected completion key", completionKey, "state", state)

return True
And at this point the script implements a functional echo server, handling disconnected sockets as intended.

There are a few things that could be polished, for instance there's no need to allocate new buffers or overlapped objects for each socket for every operation, these could easily be reused. But that's not that so important.

The tentative long term goal is to build up a base of more or less reasonable code that covers the required range of IO completion port based networking support, in order to potentially write a new stacklesssocket.py module.

Next post: IOCP-based sockets with ctypes in Python: 6
Script source code: 04 - Echoing.py