#!/usr/bin/env python2.5 # Testscript template # (C) 2008, Hans-Christian Esperer # hc@hcesperer.org # Licensed under the new (3-clause) BSD license. # Please adapt to your needs. from sys import exit, argv, stderr, stdout import sys from os import popen from time import time, sleep from random import seed, random, randint from pprint import pformat import md5 import re import socket # Error codes # Please make sure you have read http://ctf.hcesperer.org/gameserver/errorcodes.html ERR_OK = 0 # All OK ERR_CONNECTION = 1 # Connection refused / connection attempt timed out ERR_WRONGFLAG = 5 # A wrong flag / no flag was returned ERR_FUNCLACK = 9 # The service lacks functionality ERR_TIMEOUT = 13 # Done automatically by the scorebot under normal circumstances ERR_UNKNOWN = 17 # Temporary status -- do not use unless you've got a very good reason to do so ERR_GENERIC = 21 # Be sure to include a descriptive message if using this error ERR_PROTOCOL = 25 # Protocol violation # Add error descriptions here. # Note (to zeri): do not insult people. ;-) shit = {'conn': ("Unable to connect to the service", ERR_CONNECTION), 'greeting': ("The server didn't greet correctly", ERR_PROTOCOL), 'allok': ("Everything is fine", ERR_OK)} # This class can be used to read from a socket linewise. It has built-in # overflow protection. class LineReader: def __init__(self, socket): self.s = socket self.buf = '' def readline(self): while True: pos = self.buf.find("\n") if pos != -1: line, self.buf = self.buf[:pos], self.buf[pos + 1:] return line frag = self.s.recv(8192) if len(self.buf) > 8192: raise EOFException() frag = frag.replace("\r", "") if (len(self.buf) == 0) and (len(frag) == 0): raise EOFException() if len(frag) == 0: buf, self.buf = self.buf, '' return buf # EOF self.buf = self.buf + frag def readlineparams(self): line = self.readline() # print "RECV: %s" % repr(line) line = line.split(" ", 1) try: line = line[1] except: line = line[0] parts = line.split(":", 1) sparts = parts[0].strip().split(" ") del parts[0] return sparts + parts def die(reason): """This function should be called to terminate the testscript, whether successfully or unsuccessfully. An exitcode must always be specified.""" try: msg, exitcode = shit[reason] except: msg, exitcode = reason, 1 stdout.write(msg) stdout.flush() exit(exitcode) def randflag(): """This function can be used to generate random (invalid, but indistinguishable from real) flags.""" flagpart = "ABCDEFabcdef0123456789" return ''.join([flagpart[randint(0, len(flagpart) - 1)] for i in range(8)]) def expect(lr, pat, err): """Wait for a certain pattern to be read on the socket. The matched pattern is returned if found, otherwise this function calls die with the specified err.""" what = re.compile(pat) i = 0 try: while True: line = lr.readline() i = i + 1 if i > 128: die('toomuch') sys.stderr.write("%s| %s\n" % (pat, line)) res = what.search(line) if res != None: sys.stderr.write('>> found string %s (%s) <<\n' % (repr(pat), pformat(res.groups()))) return res.groups() except: sys.stderr.write("Didn't catch string %s\n" % pat) die(err) # getsock can be used to create a TCP socket def getsock(): return socket.socket(socket.AF_INET, socket.SOCK_STREAM) # The store function is called to store a flag. def store((ip, flagid, flag)): seed(time()) s = getsock() # TODO: specify correct port here try: s.connect((ip, 25)) except: die('conn') # Specify an appropriate timeout s.settimeout(10.0) # You should always use the LineReader to read from the socket, # unless, of course, you implement a custom binary protocol. # Use extra care in that case. Use a lot of care, anyway;-) lr = LineReader(s) # TODO: add correct handling code here # The random() function should be used to test different situations if random() < 0.5: pass else: pass (hostid, bar) = expect(lr, '[0-9]{3} (.+)ESMTP(.+)', 'greeting') sys.stderr.write("Host identified itself as %s\n" % hostid) sys.stderr.write("===== SUCCESSFULLY STORED FLAG =====\n") die('allok') # The retrieve function is called to retrieve a flag. def retrieve((ip, flagid, flag)): seed(time()) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(10.0) # TODO: specify correct port here try: s.connect((ip, 1986)) except: die('conn') lr = LineReader(s) # TODO: add correct handling code here. if random() < 0.5: pass else: pass print "===== SUCCESSFULLY RETRIEVED FLAG =====" die('allok') def test((ip)): stdout.write("Testing is integrated in store/retrieve functionality.\n") exit(0) # service is fully functional if __name__ == '__main__': { 'store': store, 'retrieve': retrieve, 'test': test }[argv[1]](tuple(argv[2:])) try: pass except SystemExit, e: exit(e.code) except Exception, e: stderr.write("ERROR! ") stderr.write(str(e)) print "Error: " + str(e) print "Usage: %s store|retrieve IP FLAGID FLAG" % argv[0] print " %s test IP" % argv[0] finally: stderr.flush() #vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: