https://anonbadger.wordpress.com/2010/01/16/16. -January, 2016
Last year’s Pycon,
Not everyone was here
But
It wasn’t expected
They would
This year’s pycon
The network informs me
Which gaps will not be filled
https://anonbadger.wordpress.com/2010/01/16/16. -January, 2016
Last year’s Pycon,
Not everyone was here
But
It wasn’t expected
They would
This year’s pycon
The network informs me
Which gaps will not be filled
Now-friends and tomorrow-friends flowing by
Watch and wave as they pass
Such a privilege!
It’s okay to feel pain
When painful things happen
It’s okay to feel loss
When you’ve lost someone loved
It’s okay to cry
As tears can be appropriate
It’s okay to grieve
Others will grieve by your side
-7☰<
The upstream Ansible Project used to ship rpms and tarballs on their server, releases.ansible.com. For Ansible-2.10+, they’ve (I’m part of the project although not the originator of this decision) decided not to ship rpms anymore and to push people to use pypi.python.org as the official source of the tarballs. This came up in a recent thread on twitter with a question of whether this meant that Ansible was forgetting who it was meant to serve (sysadmins) as sysadmins want to get their software in their platforms’ native packaging format rather than the language packaging format.
I don’t think this decision, in and of itself, means that.
For tarballs, I’m not sure of the rationale but it doesn’t seem like a problem to me. Most python software packaged in Fedora has pypi as the canonical source of the tarballs. Pypi serves as a hosting service for the source code rather than the point from which most Fedora users are going to install the software.
The lack of upstream rpms seems to be what triggered a nerve for some people. I was present for those discussions and I think the reasons make a lot of sense for end users. A few of those reasons:
Getting back to the fear that removing rpms from releases.ansible.com was an indication that ansible is forgetting that it is a tool for sysadmins and needs to be shipped in ways that sysadmins will find palatable…. I don’t think that the removal of rpms and tarballs is an indication as the above rationale seems like it will make things better for sysadmins in the end. However, ansible-2.10 is a big realignment of how ansible is developed and shipped and I think those changes are going to have costs for sysadmins [2]_, [3]_. nirik (Kevin Fenzi, the Fedora/EPEL ansible package maintainer) and I have been talking on and off about how the Fedora/EPEL ansible rpm should be adapted to minimize those costs but it is a large change and changes are often both hard in the transition and, after the transition is over, may be better in many areas but worse in some others. Ideas about how we can smooth out the things that are worse while taking advantage of the things that are better is appreciated!
The problems driving upstream to make the major changes that are present in 2.10: https://www.ansible.com/blog/thoughts-on-restructuring-the-ansible-project
A newer document, focused on the implementation of the changes proposed above and how they affect end users: https://github.com/ansible-collections/overview/blob/master/README.rst
Vim-8.x gained native plugin handling so that plugin managers like Pathogen and Vundle aren’t needed anymore. Enabling a plugin is as easy as cloning a git repository into the right directory (or unzipping a tarball or making the proper directories or….)
There’s a couple things that aren’t totally convenient, though. vim needs to have a command run to index the help files in a plugin otherwise vim’s :help
command won’t be able to find it. And vim doesn’t know anything about how to update the plugins later. So I whipped up a quick script to do both of those things. It’s nothing special but gets the job done: vim-update-plugins.py
Conway’s Game of Life seems to be a common programming exercise. I had to program it in Pascal when in High School and in C in an intro college programming course. I remember in college, since I had already programmed it before, that I wanted to optimize the algorithm. However, a combination of writing in C and having only a week to work on it didn’t leave me with enough time to implement anything fancy.
A couple years later, I hiked the Appalachian Trail. Seven months away from computers, just hiking day in and day out. One of the things I found myself contemplating when walking up and down hills all day was that pesky Game of Life algorithm and ways that I could improve it.
Fast forward through twenty intervening years of life and experience with a few other programming languages to last weekend. I needed a fun programming exercise to raise my spirits so I looked up the rules to Conway’s Game of Life, sat down with vim and python, and implemented a few versions to test out some of the ideas I’d had kicking around in my head for a quarter century.
This blog post will only contain a few snippets of code to illustrate the differences between each approach. Full code can be checked out from this github repository.
The naive branch is an approximation of how I would have first coded Conway’s Game of Life way back in that high school programming course. The grid of cells is what I would have called a two dimensional array in my Pascal and C days. In Python, I’ve more often heard it called a list of lists. Each entry in the outer list is a row on the grid which are each represented by another list. Each entry in the inner lists are cells in that row of the grid. If a cell is populated, then the list entry contains True
. If not, then the list entry contains False
.
One populated cell surrounded by empty cells would look like this:
board = [ [False, False, False], [False, True, False], [False, False, False], ]
Looking up an individual cell’s status is a matter of looking up an index in two lists: First the y-index in the outer list and then the x-index in an inner list:
# Is there a populated cell at x-axis 0, y-axis 1? if board[0][1] is True: pass
Checking for changes is done by looping through every cell on the Board, and checking whether each cell’s neighbors made the cell fit a rule to populate or depopulate the cell on the next iteration.
for y_idx, row in enumerate(board): for x_idx, cell in enumerate(row): if cell: if not check_will_live((x_idx, y_idx), board, max_x, max_y): next_board[y_idx][x_idx] = False else: if check_new_life((x_idx, y_idx), board, max_x, max_y): next_board[y_idx][x_idx] = True
This is a simple mapping of the two-dimensional grid that Conway’s takes place on into a computer data structure and then a literal translation of Conway’s ruleset onto those cells. However, it seems dreadfully inefficient. Even in college I could see that there should be easy ways to speed this up; I just needed the time to implement them.
The intermediate branch rectifies inefficiencies with checking of the next generation cells. The naive branch
checks every single cell that is present in the grid. However, thinking about most Conway setups, most of the cells are blank. If we find a way to ignore most of the blank cells, then it would save us a lot of work. We can’t ignore all blank cells, though; if a blank cell has exactly three populated neighbors then the blank cell will become populated in the next generation.
The key to satisfying both of those is to realize that all the cells we’re going to need to change will either be populated (in which case, they could die and become empty in the next generation) or they will be a neighbor of a populated cell (in which case, they may become populated next generation). So we can loop through our board and ignore all of the unpopulated cells at first. If we find a populated cell, then we must both check that cell to see if it will die and also check its empty neighbors to see if they will be filled in the next generation.
The major change to implement that is here:
checked_cells = set() # We still loop through every cell in the board but now # the toplevel code to do something if the cell is empty # has been removed. for y_idx, row in enumerate(board): for x_idx, cell in enumerate(row): if cell: if not check_will_live((x_idx, y_idx), board, max_x, max_y): next_board[y_idx][x_idx] = False # Instead, inside of the conditional block to # process when a cell is populated, there's # a new loop to check all of the neighbors of # a populated cell. for neighbor in (n for n in find_neighbors((x_idx, y_idx), max_x, max_y) if n not in checked_cells): # If the cell is empty, then we check whether # it should be populated in the next generation if not board[neighbor[1]][neighbor[0]]: checked_cells.add(neighbor) if check_new_life(neighbor, board, max_x, max_y): next_board[neighbor[1]][neighbor[0]] = True
Observant readers might also notice that I’ve added a checked_cells set
. This tracks which empty cells we’ve already examined to see if they will be populated next generation. Making use of that means that we will only check a specific empty cell once per generation no matter how many populated cells it’s a neighbor of.
These optimizations to the checking code made things about 6x as fast as the naive approach.
The principle behind the intermediate branch of only operating on populated cells and their neighbors seemed like it should be applicable to the data structure I was storing the grid in as well as the checks. Instead of using fixed length arrays to store both the populated and empty portions of the grid, I figured it should be possible to simply store the populated portions of the grid and then use those for all of our operations.
However, C is a very anemic language when it comes to built in data structures.
if I was going to do that in my college class, I would have had to implement a linked list or a hash map data structure before I even got to the point where I could implement the rules of Conway’s Game of Life. Today, working in Python with it’s built in data types, it was very quick to implement a data structure of only the populated cells.
For the gridless branch, I replaced the 2d array with a set. The set contained tuples of (x-coordinate, y-coordinate)
which defined the populated cells. One populated cell surrounded by empty cells would look like this:
board = set(( (1,1), ))
Using a set had all sorts of advantages:
- board = [] - for y in range(0, max_y): - for x in range(0, max_x): - board[x][y] = (x, y) in initial_dataset + board = set() + for x, y in initial_dataset: + board.add((x, y))
- for y in range(0, max_y): - for x in range(0, max_x): - if board[x][y]: - screen.addstr(y, x, ' ', curses.A_REVERSE) + for (x, y) in board: + screen.addstr(y, x, ' ', curses.A_REVERSE)
True
or False
.
- next_board = copy.deepcopy(board) + next_board = set()
# Perform checks and update the board for cell in board: if check_will_live(cell, board, max_x, max_y): next_board.add(cell) babies = check_new_life(cell, board, max_x, max_y) next_board.update(babies) board = next_board
- if board[cell[0]][cell[1]]: + if cell in board:
Gridless made the program about 3x faster than intermediate, or about 20x faster than naive.
Despite being 3x faster than intermediate, gridless was doing some extra work. The code in the master branch attempts to correct those.
The most important change was that empty cells in gridless were being checked for each populated cell that was its neighbor. Adding a checked_cells set like the intermediate branch had to keep track of that ensures that we only check whether an empty cell should be populated in the next generation one time:
checked_cells = set() for cell in board: if check_will_live(cell, board, max_x, max_y): next_board.add(cell) checked_cells.add(cell) # Pass checked_cells into check_new_life so that # checking skips empty neighbors which have already # been checked this generation babies, barren = check_new_life(cell, board, checked_cells, max_x, max_y) checked_cells.update(babies) checked_cells.update(barren)
The other, but relatively small, optimization was to use Python’s builtin least-recently-used cache decorator on the find_neighbors function. This allowed us to skip computing the set of neighboring cells when those cells were requested soon after each other. In the set-based code, finding_neighbors is called for the same cell back to back quite frequently so this did have a noticable impact.
+ @functools.lru_cache() def find_neighbors(cell, max_x, max_y):
These changes sped up the master branch an additional 30% over what gridless had achieved or nearly 30x as fast as the naive implementation that we started with.
Never ever, ever raise a regular exception in a Python signal handler.
This is probably the best advice that I had never heard. But after looking into an initial analysis of a timeout decorator bug it’s advice that I wish was prominently advertised. So I’m publishing this little tale to let others know about this hidden gotcha so that, just maybe, when you have an opportunity to do this, a shiver will run down your spine, your memory will be tickled, and you’ll be able to avoid the pitfalls that I’ve recorded here.
Signals are a means provided by an operating system to inform programs of events that aren’t part of their normal program flow. If you’re on a UNIX-like operating system and hit Control-C
to cancel a program running in the shell, you’ve used a signal. Control-C
in the shell sends a SIGINT
(Interrupt) signal to the program. The program receives the SIGINT
and a signal handler (a function which is written to take appropriate action when a signal is received) is invoked to handle it. In most cases, for SIGINT
, the signal handler tries to complete any pressing work (for instance, to finish writing data to a file so that the file isn’t left in a half-written state) and then causes the program to exit.
Python provides a signal library which is very similar to the C API underneath Python. Python does a little bit of behind the scenes work to make sure that signals appear to be interacting with Python code rather than interfering with the interpreter’s implementation (meaning that signals will appear to happen between Python’s byte code instructions rather than leaving a Python instruction half-completed).
If you search the internet for how to implement a timeout in Python, you’ll find tons of examples using signals, including one from the standard library documentation and one which is probably where the design for our original decorator came from. So let’s create some quick test code to show how the signal works to implement a timeout
import signal import time def handler(signum, frame): print('Signal handler called with signal', signum) raise OSError("timeout exceeded!") def long_function(): time.sleep(10) # Set the signal handler and a 1-second alarm old_handler = signal.signal(signal.SIGALRM, handler) signal.alarm(1) # This sleeps for longer than the alarm start = time.time() try: long_function() except OSError as e: duration = time.time() - start print('Duration: %.2f' % duration) raise finally: signal.signal(signal.SIGALRM, old_handler) signal.alarm(0) # Disable the alarm
This code is adapted from the example of implementing a timeout in the standard library documentation. We first define a signal handler named handler()
which will raise an OSError
when it’s invoked. We then define a function, long_function()
which is designed to take longer to run than our timeout. The we hook everything together:
SIGALRM
occurs.SIGALRM
after 1 second.long_function()
SIGALARM
handler as the function to invoke if a new SIGALRM
occurs and then disabling the alarm that we had previously set.When we run the code we see that the signal handler raises the OSError
as expected:
$ /bin/time -p python3 test.py Signal handler called with signal 14 Duration: 1.00 Traceback (most recent call last): File "test.py", line 18, in long_function() File "test.py", line 11, in long_function time.sleep(2) File "test.py", line 8, in handler raise OSError("timeout exceeded!") OSError: timeout exceeded! real 1.04
Although long_function()
takes 10 seconds to complete, the SIGALRM
fires after 1 second. That causes handler()
to run which raises the OSError
with the message timeout exceeded!
. The exception propagates to our toplevel where it is caught and prints Duration: 1.00
before re-raising so we can see the traceback. We see that the output of /bin/time
roughly agrees with the duration we calculated within the program… just a tad over 1 second to run the whole thing.
It’s time to make our long_function code a little less trivial.
import signal import time def handler(signum, frame): print('Signal handler called with signal', signum) raise OSError("timeout exceeded!") def long_function(): try: with open('/etc/passwd', 'r') as f: data = f.read() # Simulate reading a lot of data time.sleep(10) except OSError: # retry once: with open('/etc/passwd', 'r') as f: data = f.read() time.sleep(10) # Set the signal handler and a 5-second alarm old_handler = signal.signal(signal.SIGALRM, handler) signal.alarm(1) start = time.time() # This sleeps for longer than the alarm try: long_function() except OSError as e: duration = time.time() - start print('Duration: %.2f' % duration) raise finally: signal.signal(signal.SIGALRM, old_handler) signal.alarm(0) # Disable the alarm
We’ve changed long_function()
, lines 9-20, to do two new things:
So what happens when we run this version?
$ /bin/time -p python3 test.py Signal handler called with signal 14 real 11.07
As you can see from the output, our program still fired the SIGALRM
. And the signal handler still ran. But after it ran, everything else seems to be different. Apparently, the OSError
didn’t propagate up the stack to our toplevel so we didn’t print the duration. Furthermore, we ended up waiting for approximately 10 seconds in addition to the 1 second we waited for the timeout. What happened?
The key to understanding this is to understand that when the signal handler is invoked, Python adds the call to the signal handler onto its current call stack (The call stack roughly represents the nesting of functions that lead up to where the code is currently executing. So, in our example, inside of the signal handler, the call stack would start with the module toplevel, then long_function()
, and finally handler()
. If you look at the traceback from the first example, you can see exactly that call stack leading you through all the function calls to the point where the exception was raised.) When the signal handler raises its exception, Python unwinds the call stack one function at a time to find an exception handler (not to be confused with our signal handler) which can handle the exception.
Where was the program waiting when SIGALRM
was emitted? It was on line 14, inside of long_function()
. So Python acts as though handler()
was invoked directly after that line. And when handler()
raises its OSError
exception, Python then unwinds the call stack from handler()
to long_function()
and sees that on line 15, there’s an except OSError:
and so Python lets it catch the signal handler’s OSError
instead of propagating it up the stack further. And in our code, that exception handler decides to retry reading the file which is where there is a second 10 second delay as we read the file. Since the SIGALRM
was already used up, the timeout doesn’t fire this time. So the rest of the bug progresses from there: long_function()
now waits the full 10 seconds before returning because there’s no timeout to stop it. It then returns normally to its caller. The caller doesn’t receive an OSError
exception. So it doesn’t fire its own OSError
exception handler which would have printed the Duration.
There are even less intuitive ways that this bug can be provoked. For instance:
def long_function(): time.sleep(0.8) try: time.sleep(0.1) time.sleep(0.1) time.sleep(0.1) except Exception: pass
In this version, we don’t have the context clue that we’re raising OSError
in the signal handler and mistakenly catching it in long_function()
. An experienced Python coder will realize that except Exception
is sufficiently broad to be catching OSError
, but in the clutter of other function calls and without the same named exception to provoke their suspicions initially, they might not realize that the occasional problems with the timeout not working could be triggered by this late exception handler.
This is also problematic if the programmer is having to orient themselves off of a traceback. That would lead them to look inside of long_function()
for the source of a OSError
. They won’t find it there because it’s inside of the signal handler which is outside of the function.
import zipfile def long_function(): time.sleep(0.9) zipfile.is_zipfile('my_file.zip')
In this case, there’s no exception handling in our code to clue us in. If you look at the implementation of zipfile.is_zipfile()
, though, you’ll see that there’s an except OSError:
inside of there. If the timeout’s alarm ends up firing while you are inside of that code, is_zipfile()
will just as happily swallow the OSError
as an exception handler inside of long_function()
would have.
There are ways to make the timeout functionality more robust. For instance, let’s define our handler like this:
import functools class OurTimeout(BaseException): pass def handler(timeout, signum, frame): print('Signal handler called with signal', signum) signal.alarm(timeout) raise OurTimeout("timeout exceeded!") # toplevel code: one_second_timeout = functools.partial(handler, 1) old_handler = signal.signal(signal.SIGALRM, one_second_timeout) signal.alarm(1) try: long_function() except OurTimeout: print('Timeout from our timeout function)
The first thing you can see is that this now raises a custom timeout which inherits from BaseException. This is more robust than using standard timeouts which are rooted at Exception because well written Python code won’t catch exceptions inheriting directly from BaseException. It’s not foolproof, however, because it’s only convention that prevents someone from catching BaseException. And no matter how good a Python programmer you are, I’m sure you’ve been lazy enough to write code like this at least once:
def long_function(): try: with open('/etc/passwd', 'r') as f: f.read() except: # I'll fix the possible exception classes later rollback()
Bare except:
catches any exception, including BaseException so using a bare except will still break this timeout implementation.
The second thing this implementation changes is to reset the alarm inside of the signal handler. This is helpful as it means that the code will be able to timeout multiple times to get back to the toplevel exception handler. In our retry example, Both the initial attempt and the retry attempt would timeout so long_function()
would end up taking only two seconds and fail due to our timeout in the end. However, there are still problems. For instance, this code ends up taking timeout * 3
seconds instead of just timeout seconds because the exception handler prevents us from hitting the break statement so we’ll have to keep hitting the timeout:
def long_function(): for i in range(0, 3): try: time.sleep(10) except OSError: pass else: break
The following code, (arguably buggy because you *should* disable the alarm as the first thing in the recovery code instead of the last) can end up aborting the toplevel code’s recovery effort if timeout is too short:
try: long_function() except OSError: rollback_partial_long_function() signal.alarm(0)
So even though this code is better than before, it is still fragile, error prone, and can do the wrong thing even with code that isn’t obviously wrong.
When I was looking at the problem with the timeout decorator in Ansible what struck me was that when using the decorator, the decorator was added outside of a function telling it to timeout but the timeout was occurring and potentially being processed inside of the function. That meant that it would always be unintuitive and error prone to someone trying to use the decorator:
@timeout.timeout(1) def long_function(): try: time.sleep(10) except: # Try an alternative pass try: long_function() except timeout.TimeoutError: print('long_function did not complete')
When looking at the code, the assumption is that on the inside there’s long_function, then outside of it, the timeout code, and outside of that the caller. So the expectation is that an exception raised by the timeout code should only be processed by the caller since exceptions in Python only propagate up the call stack. Since the decorator’s functionality was implemented via a signal handler, though, that expectation was disappointed.
To solve this, I realized that the way signals and exceptions interact would never allow exceptions to propagate correctly. So I switched from using a signal to using one thread for the timeout and one thread for running the function. Simplified, that flow looks like this (You can look at the code for the new decorator in Ansible if you’re okay with the GPLv3+ license. The following code is all mine in case you want to re-implement the idea without the GPLv3+ restrictions:
import multiprocessing import multiprocessing.pool def timeout(seconds=10): def decorator(func): def wrapper(*args, **kwargs): pool = multiprocessing.pool.ThreadPool(processes=1) results = pool.apply_async(func, args, kwargs) pool.close() try: return results.get(seconds) except multiprocessing.TimeoutError: raise OSError('Timeout expired after: %s' % seconds) finally: pool.terminate() return wrapper return decorator @timeout(1) def long_func(): try: time.sleep(10) except OSError: print('Failure!') print('end of long_func') try: long_func() except OSError as e: print('Success!') raise
Edit: Davi Aguiar pointed out that the snippet was leaving the thread running after timeout. The example has been updated to add a pool.terminate()
call inside of a finally:
to take care of reaping the thread after the timeout expires.
As you can see, I create a multiprocessing.pool.ThreadPool
with a single thread in it. Then I run the decorated function in that thread. I use res.get(timeout)
with the timeout we set to get the results or raise an exception if the timeout is exceeded. If the timeout was exceeded, then I throw the OSError to be like our first example.
If all goes well, the exception handling inside of long_func
won’t have any effect on what happens. Let’s see:
$ python2 test.py Success! Traceback (most recent call last): File "test.py", line 49, in long_func() File "test.py", line 35, in wrapper raise OSError('Timeout expired after: %s' % seconds) OSError: Timeout expired after: 1
Yep, as you can see from the stack trace, the OSError
is now being thrown from the decorator, not from within long_func()
. So only the toplevel exception handler has a chance to handle the exception. This leads to more intuitive code and hopefully less bugs down the road.
Signal handlers can be used for more than just timeouts. And they can do other things besides trying to cancel an ongoing function. For instance, the Apache web server has a signal handler for the HUP
(HangUP) signal. When it receives that, it reloads its configuration from disk. If you take care to catch any potential exceptions during that process, you shouldn’t run across these caveats because these problems only apply to raising an exception from the signal handler.
When you do want to exit the function via a signal handler, I would be a little hesitant because you can never entirely escape from the drawbacks above and threading provides a more intuitive interface for the called and calling code. I think that a few practices make it more robust, however. As mentioned above:
And one that wasn’t mentioned before:
It’s better to make an ad hoc exception-raising signal handler that handles a specific problem inside of a function rather than attempting to place it outside of the function. For instance:
def timeout_handler(): raise BaseException('Timeout') def long_function(): try: old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(1) time.sleep(10) # Placeholder for the real, # potentially blocking code except BaseException: print('Timed out!') finally: signal.signal(signal.SIGALRM, old_handler) signal.alarm(0) long_function()
Why is it better to do it this way? If you do this, you localize the exception catching due to the signal handler with the code that could potentially raise the exception. It isn’t foolproof as you are likely still going to call helper functions (even in the stdlib) which could handle a conflicting exception but at least you are more clearly aware of the fact that the exception could potentially originate from any of your code inside this function rather than obfuscating all of that behind a function call.
Last year I played around with using jsonschema for validating some data that I was reading into my Python programs. The API for the library was straightforward but the schema turned out to be a pretty sprawling affair: stellar-schema.json
This past weekend I started looking at adding some persistence to this program in the form of SQLAlchemy-backed data structures. To make those work right, I decided that I had to change the schema for these data file as well. Looking at the jsonschema, I realized that I was having a hard time remembering what all the schema keywords meant and how to modify the file; never a good sign! So I decided to reimplement the schema in something simpler.
At work, we use a library called voluptuous. One of the nice things about it is that the containers are Python data structures and the types are Python type constructors. So simple things can be very simple:
from voluptuous import Schema schema = [ {'hosts': str, 'gather_facts': bool, 'tasks': [], } ]
The unfortunate thing about voluptuous is that the reference documentation is very bad. It doesn’t have any recipe-style documentation which can teach you how to best accomplish tasks. So when you do have to reach deeper to figure out how to do something a bit more complex, you can try to find an example in the README.md (a pretty good resource but there’s no table of contents so you’re often left wondering whether what you want is documented there or not) or you might have to hunt around with google for a blog post or stackoverflow question that explains how to accomplish that. If someone else hasn’t discovered an answer already…. well, then, voluptuous may well have just the feature you need but you might never find it.
The feature that I needed today was to integrate Python-3.4’s Enum type with voluptuous. The closest I found was this feature request which asked for enum support to be added. The issue was closed with a few examples of “code that works” which didn’t quite explain all the underlying concepts to a new voluptuous user like myself. I took away three ideas:
Schema(Coerce(EnumType))
was supposed to workSchema(EnumType)
was supposed to work but…Schema(EnumType)
doesn’t work the way the person who opened the issue (or I) would get value from.I was still confused but at least now I had some pieces of code to try out. So here’s what my first series of tests looked like:
from enum import Enum import voluptuous as v # My eventual goal: data_to_validate = {"type": "one"} MyTypes = Enum("MyTypes", ['one', 'two', 'three']) s1 = v.Schema(MyTypes) s2 = v.Schema(v.Coerce(MyTypes)) s1('one') # validation error (figured that would be the case from the ticket) s1(MyTypes.one) # Works but not helpful for me s2('one') # validation error (Thought this one would work...)
Hmm… so far, this isn’t looking too hopeful. The only thing that I got working isn’t going to help me validate my actual data. Well, let’s google for what Coerce
actually does….
A short while later, I think I see what’s going on. Coerce
will attempt to mutate the data given to it into a new type via the function it is given. In the case of an Enum, you can call the Enum on the actual values backing the Enum, not the symbolic labels. For the symbolic labels, you need to use square bracket notation. Square brackets are syntax for the __getitem__
magic method so maybe we can pass that in to get what we want:
MyTypes = Enum("MyTypes", ['one', 'two', 'three']) s2 = v.Schema(v.Coerce(MyTypes)) s3 = v.Schema(v.Coerce(MyTypes.__getitem__)) s2(1) # This validates s3('one') # And so does this! Yay!
Okay, so now we think that this all makes sense…. but there’s actually one more wrinkle that we have to work out. It turns out that Coerce
only marks a value as Invalid if the function it’s given throws a TypeError or ValueError. __getitem__ throws a KeyError so guess what:
>>> symbolic_only({"type": "five"}) Traceback (most recent call last): File "", line 1, in File "/home/badger/.local/lib/python3.6/site-packages/voluptuous/schema_builder.py", line 267, in __call__ return self._compiled([], data) File "/home/badger/.local/lib/python3.6/site-packages/voluptuous/schema_builder.py", line 587, in validate_dict return base_validate(path, iteritems(data), out) File "/home/badger/.local/lib/python3.6/site-packages/voluptuous/schema_builder.py", line 379, in validate_mapping cval = cvalue(key_path, value) File "/home/badger/.local/lib/python3.6/site-packages/voluptuous/schema_builder.py", line 769, in validate_callable return schema(data) File "/home/badger/.local/lib/python3.6/site-packages/voluptuous/validators.py", line 95, in __call__ return self.type(v) File "/usr/lib64/python3.6/enum.py", line 327, in __getitem__ return cls._member_map_[name] KeyError: 'five'
The code throws a KeyError instead of a voluptuous Invalid exception. Okay, no problem, we just have to remember to wrap the __getitem__ with a function which returns ValueError if the name isn’t present. Anything else you should be aware of? Well, for my purposes, I only want the enum’s symbolic names to match but what if you wanted either the symbolic names or the actual values to work (s2 or s3)? For that, you can combine this with voluptuous’s Any
function. Here’s what those validators will look like:
from enum import Enum import voluptuous as v data_to_validate = {"type": "one"} MyTypes = Enum("MyTypes", ['one', 'two', 'three']) def mytypes_validator(value): try: MyTypes[value] except KeyError: raise ValueError(f"{value} is not a valid member of MyTypes") return value symbolic_only = v.Schema({"type": v.Coerce(mytypes_validator)}) symbols_and_values = v.Schema({"type": v.Any(v.Coerce(MyTypes), v.Coerce(mytypes_validator), )}) symbolic_only(data_to_validate) # Hip Hip! symbols_and_values(data_to_validate) # Hooray! symbols_and_values({"type": 1}) # If this is what you *really* want symbolic_only({"type": 1}) # If you want implementation to remain hidden symbols_and_values({"type": 5}) # Prove that invalids are getting caught
In my last post I used pytest to write a unittest for code which wrote to sys.stdout
on Python2 and sys.stdout.buffer
on Python3. Here’s that code again:
# byte_writer.py import sys import six def write_bytes(some_bytes): # some_bytes must be a byte string if six.PY3: stdout = sys.stdout.buffer else: stdout = sys.stdout stdout.write(some_bytes)
and the unittest to test it:
# test_byte_writer.py import io import sys from byte_writer import write_bytes def test_write_byte(capfdbinary): write_bytes(b'\xff') out, err = capfdbinary.readouterr() assert out == b'\xff'
This all works great but as we all know, there’s always room for improvement.
When working on code that’s intended to run on both Python2 and Python3, you often hear the advice that making sure that every line of your code is executed during automated testing is extremely helpful. This is, in part, because several classes of error that are easy to introduce in such code are easily picked up by tests which simply exercise the code. For instance:
These benefits are over and above the behaviours that your tests were specifically written to check for.
Once you accept that you want to strive for 100% coverage, though, the next question is how to get there. The Python coverage library can help. It integrates with your unittest framework to track which lines of code and which branches have been executed over the course of running your unittests. If you’re using pytest to write your unittests like I am, you will want to install the pytest-cov package to integrate coverage with pytest.
If you recall my previous post, I had to use a recent checkout of pytest to get my unittests working so we have to make sure not to overwrite that version when we install pytest-cov. It’s easiest to create a test-requirements.txt
file to make sure that we get the right versions together:
$ cat test-requirements.txt pytest-cov git+git://github.com/pytest-dev/pytest.git@6161bcff6e3f07359c94a7be52ad32ecb882214 $ pip install --user --upgrade -r test-requirements.txt $ pip3 install --user --upgrade -r test-requirements.txt
And then we can run pytest to get coverage information:
$ pytest --cov=byte_writer --cov-report=term-missing --cov-branch ======================= test session starts ======================== test_byte_writer.py . ---------- coverage: platform linux, python 3.5.4-final-0 ---------- Name Stmts Miss Branch BrPart Cover Missing ------------------------------------------------------------ byte_writer.py 7 1 2 1 78% 10, 7->10 ===================== 1 passed in 0.02 seconds =====================
Yay! The output shows that currently 78% of byte_writer.py is executed when the unittests are run. The Missing
column shows two types of missing things. The 10
means that line 10 is not executed. The 7->10
means that there’s a branch in the code where only one of its conditions was executed. Since the two missing pieces coincide, we probably only have to add a single test case that satisfies the second code branch path to reach 100% coverage. Right?
If we take a look at lines 7 through 10 in byte_writer.py
we can see that this might not be as easy as we first assumed:
if six.PY3: stdout = sys.stdout.buffer else: stdout = sys.stdout
Which code path executes here is dependent on the Python version the code runs under. Line 8 is executed when we run on Python3 and Line 10 is skipped. When run on Python2 the opposite happens. Line 10 is executed and Line 8 is skipped. We could mock out the value in six.PY3
to hit both code paths but since sys.stdout
only has a buffer attribute on Python3, we’d have to mock that out too. And that would lead us back into conflict with pytest capturing stdout as we we figured out in my previous post.
Taking a step back, it also doesn’t make sense that we’d test both code paths on the same run. Each code path should be executed when the environment is correct for it to run and we’d be better served by modifying the environment to trigger the correct code path. That way we also test that the code is detecting the environment correctly. For instance, if the conditional was triggered by the user’s encoding, we’d probably run the tests under different locale settings to check that each encoding went down the correct code path. In the case of a Python version check, we modify the environment by running the test suite under a different version of Python. So what we really want is to make sure that the correct branch is run when we run the test suite on Python3 and the other branch is run when we execute it under Python2. Since we already have to run the test suite under both Python2 and Python3 this has the net effect of testing all of the code.
So how do we achieve that?
Coverage has the ability to match lines in your code to a string and then exclude those lines from the coverage report. This allows you to tell coverage that a branch of code will never be executed and thus it shouldn’t contribute to the list of unexecuted code. By default, the only string to match is “pragma: no cover
“. However, “pragma: no cover
” will unconditionally exclude the lines it matches which isn’t really what we want. We do want to test for coverage of the branch but only when we’re running on the correct Python version. Luckily, the matched lines are customizable and further, they can include environment variables. The combination of these two abilities means that we can add lines to exclude that are different when we run on Python2 and Python3. Here’s how to configure coverage to do what we need it to:
$ cat .coveragerc [report] exclude_lines= pragma: no cover pragma: no py${PYTEST_PYMAJVER} cover
This .coveragerc includes the standard matched line, pragma: no cover
and our addition, pragma: no py${PYTEST_PYMAJVER} cover
. The addition uses an environment variable, PYTEST_PYMAJVER
so that we can vary the string that’s matched when we invoke pytest.
Next we need to change the code in byte_writer.py
so that the special strings are present:
if six.PY3: # pragma: no py2 cover stdout = sys.stdout.buffer else: # pragma: no py3 cover stdout = sys.stdout
As you can see, we’ve added the special comments to the two conditional lines. Let’s run this manually and see if it works now:
$ PYTEST_PYMAJVER=3 pytest --cov=byte_writer --cov-report=term-missing --cov-branch ======================= test session starts ======================== test_byte_writer.py . ---------- coverage: platform linux, python 3.5.4-final-0 ---------- Name Stmts Miss Branch BrPart Cover Missing ------------------------------------------------------------ byte_writer.py 6 0 0 0 100% ==================== 1 passed in 0.02 seconds ===================== $ PYTEST_PYMAJVER=2 pytest-2 --cov=byte_writer --cov-report=term-missing --cov-branch ======================= test session starts ======================== test_byte_writer.py . --------- coverage: platform linux2, python 2.7.13-final-0 --------- Name Stmts Miss Branch BrPart Cover Missing ------------------------------------------------------------ byte_writer.py 5 0 0 0 100% ===================== 1 passed in 0.02 seconds =====================
Well, that seemed to work! Let’s run it once more with the wrong PYTEST_PYMAJVER
value to show that coverage is still recording information on the branches that are supposed to be used:
$ PYTEST_PYMAJVER=3 pytest-2 --cov=byte_writer --cov-report=term-missing --cov-branch ======================= test session starts ======================== test_byte_writer.py . -------- coverage: platform linux2, python 2.7.13-final-0 -------- Name Stmts Miss Branch BrPart Cover Missing ------------------------------------------------------------ byte_writer.py 6 1 0 0 83% 8 ===================== 1 passed in 0.02 seconds =====================
Yep. When we specify the wrong PYTEST_PYMAJVER
value, the coverage report shows that the missing line is included as an unexecuted line. So that seems to be working.
Just one more thing… it’s kind of a pain to have to set the PYTEST_PYMAJVER
variable with every test run, isn’t it? Wouldn’t it be better if pytest would automatically set that for you? After all, pytest knows which Python version it’s running under so it should be able to. I thought so too so I wrote pytest-env-info to do just that. When installed, pytest-env-info
will set PYTEST_VER
, PYTEST_PYVER
, and PYTEST_PYMAJVER
so that they are available to pytest_cov
and other, similar plugins which can use environment variables to configure themselves. It’s available on pypi so all you have to do to enable it is add it to your requirements so that pip will install it and then run pytest:
$ cat test-requirements.txt pytest-cov git+git://github.com/pytest-dev/pytest.git@6161bcff6e3f07359c94a7be52ad32ecb8822142 pytest-env-info $ pip3 install --user -r test-requirements.txt $ pytest --cov=byte_writer --cov-report=term-missing --cov-branch ======================== test session starts ========================= plugins: env-info-0.1.0, cov-2.5.1 test_byte_writer.py . ---------- coverage: platform linux2, python 2.7.14-final-0 ---------- Name Stmts Miss Branch BrPart Cover Missing ------------------------------------------------------------ byte_writer.py 5 0 0 0 100% ====================== 1 passed in 0.02 seconds ======================