Monday, October 20, 2014

subprocess.Popen() or Abusing a Home-grown Windows Executable

Each month I redo 3D block model interpolations for a series of open pits at a distant mine.  Those of you who follow my twitter feed often see me tweet, "The 3D geologic block model interpolation chuggeth . . ."  What's going on is that I've got all the processing power maxed out dealing with millions of model blocks and thousands of data points.  The machine heats up and with the fan sounds like a DC-9 warming up before flight.

All that said, running everything roughly in parallel is more efficient time-wise than running it sequentially.  An hour of chugging is better than four.  The way I've been doing this is using the Python (2.7) subprocess module's Popen method, running my five interpolated values in parallel.  Our Python programmer Lori originally wrote this to run in sequence for a different set of problems.  I bastardized it for my own.

The subprocess part of the code is relatively straightforward.  Function startprocess() in my code covers that.

What makes this problem a little more challenging:

1) it's a vendor supplied executable we're dealing with . . . without an API or source . . . that's interactive (you can't feed it the config file path; it asks for it).  This results in a number of time.sleep() and <process>.stdin.write() calls that can be brittle.

2) getting the processes started, as I just mentioned, is easy.  Finding out when to stop, or kill them, requires knowledge of the app and how it generates output.  I've gone for an ugly, but effective check of report file contents.

3) while waiting for the processes to finish their work, I need to know things are working and what's going on.  I've accomplished this by reporting the data files' sizes in MB.

4) the executable isn't designed for a centralized code base (typically all scripts are kept in a folder for the specific project or pit), so it only allows about 100 character columns in the file paths sent to it.  I've omitted this from my sanitized version of the code, but it made things even messier than they are below.  Also, I don't know if all Windows programs do this, but the paths need to be inside quotes - the path kept breaking on the colon (:) when not quoted.

Basically, this is a fairly ugly problem and a script that requires babysitting while it runs.  That's OK; it beats the alternative (running it sequentially while watching each run).  I've tried to adhere to DRY (don't repeat yourself) as much as possible, but I suspect this could be improved upon.

The reason why I blog it is that I suspect there are other people out there who have to do the same sort of thing with their data.  It doesn't have to be a mining problem.  It can be anything that requires intensive computation across voluminous data with an executable not designed with a Python API.

Notes: 

1) I've omitted the file multirunparameters.py that's in an import statement.  It has a bunch of paths and names that are relevant to my project, but not to the reader's programming needs.

2) python 2.7 is listed at the top of the file as "mpython."  This is the Python that our mine planning vendor ships that ties into their quite capable Python API.  The executable I call with subprocess.Popen() is a Windows executable provided by a consultant independent of the mine planning vendor.  It just makes sense to package this interpolation inside the mine planning vendor's multirun (~ batch file) framework as part of an overall working of the 3D geologic block model.  The script exits as soon as this part of the batch is complete.  I've inserted a 10 second pause at the end just to allow a quick look before it disappears.

#!C:/MineSight/x64/mpython

"""
Interpolate grades with <consultant> program
from text files.
"""


import argparse
import subprocess as subx
import os
import collections as colx

import time
from datetime import datetime as dt


# Lookup file of constants, pit names, assay names, paths, etc.
import multirunparameters as paramsx


parser = argparse.ArgumentParser()
# 4 letter argument like 'kwat'
# Feed in at command line.
parser.add_argument('pit', help='four letter, lower case pit abbreviation (kwat)', type=str)
args = parser.parse_args()
PIT = args.pit


pitdir = paramsx.PATHS[PIT]
pathx = paramsx.BASEPATH.format(pitdir)
controlfilepathx = paramsx.CONTROLFILEPATH.format(pitdir)


timestart = dt.now()
print(timestart)


PROGRAM = 'C:/MSPROJECTS/EOMReconciliation/2014/Multirun/AllPits/consultantprogram.exe'

ENDTEXT = 'END <consultant> REPORT'

# These names are the only real difference between pits.
# Double quote is for subprocess.Popen object's stdin.write method
# - Windows path breaks on colon without quotes.
ASSAY1DRIVER = 'KDriverASSAY1{:s}CBT.csv"'.format(PIT)
ASSAY2DRIVER = 'KDriverASSAY2{:s}CBT.csv"'.format(PIT)
ASSAY3DRIVER = 'KDriverASSAY3_{:s}CBT.csv"'.format(PIT)
ASSAY4DRIVER = 'KDriverASSAY4_{:s}CBT.csv"'.format(PIT)
ASSAY5DRIVER = 'KDriverASSAY5_{:s}CBT.csv"'.format(PIT)


RETCHAR = '\n'

ASSAY1 = 'ASSAY1'
ASSAY2 = 'ASSAY2'
ASSAY3 = 'ASSAY3'
ASSAY4 = 'ASSAY4'
ASSAY5 = 'ASSAY5'


NAME = 'name'
DRFILE = 'driver file'
OUTPUT = 'output'
DATFILE = 'data file'
RPTFILE = 'report file'


# data, report files
ASSAY1K = 'ASSAY1K.csv'
ASSAY1RPT = 'ASSAY1.RPT'

ASSAY2K = 'ASSAY2K.csv'
ASSAY2RPT = 'ASSAY2.RPT'

ASSAY3K = 'ASSAY3K.csv'
ASSAY3RPT = 'ASSAY3.RPT'

ASSAY4K = 'ASSAY4K.csv'
ASSAY4RPT = 'ASSAY4.RPT'

ASSAY5K = 'ASSAY5K.csv'
ASSAY5RPT = 'ASSAY5.RPT'


OUTPUTFMT = '{:s}output.txt'

ASSAYS = {1:{NAME:ASSAY1,
             DRFILE:controlfilepathx + ASSAY1DRIVER,
             OUTPUT:pathx + OUTPUTFMT.format(ASSAY1),
             DATFILE:pathx + ASSAY1K,
             RPTFILE:pathx + ASSAY1RPT},
          2:{NAME:ASSAY2,
             DRFILE:controlfilepathx + ASSAY2DRIVER,
             OUTPUT:pathx + OUTPUTFMT.format(ASSAY2),
             DATFILE:pathx + ASSAY2K,
             RPTFILE:pathx + ASSAY2RPT},
          3:{NAME:ASSAY3,
             DRFILE:controlfilepathx + ASSAY3DRIVER,
             OUTPUT:pathx + OUTPUTFMT.format(ASSAY3),
             DATFILE:pathx + ASSAY3K,
             RPTFILE:pathx + ASSAY3RPT},
          4:{NAME:ASSAY4,
             DRFILE:controlfilepathx + ASSAY4DRIVER,
             OUTPUT:pathx + OUTPUTFMT.format(ASSAY4),
             DATFILE:pathx + ASSAY4K,
             RPTFILE:pathx + ASSAY4RPT},
          5:{NAME:ASSAY5,
             DRFILE:controlfilepathx + ASSAY5DRIVER,
             OUTPUT:pathx + OUTPUTFMT.format(ASSAY5),
             DATFILE:pathx + ASSAY5K,
             RPTFILE:pathx + ASSAY5RPT}}


DELFILE = 'delete file'
INTERP = 'interp'
SLEEP = 'sleep'
MSGDRIVER = 'message driver'
MSGRETCHAR = 'message return character'
FINISHED1 = 'finished one assay'
FINISHEDALL = 'finished all interpolations'
TIMEELAPSED = 'time elapsed'
FILEEXISTS = 'report file exists'
DATSIZE = 'data file size'
DONE = 'number interpolations finished'
DATFILEEXIST = 'data file not yet there'
SIZECHANGE = 'report file changed size'


# for converting to megabyte file size from os.stat()
BITSHIFT = 20

# sleeptime - 5 seconds
SLEEPTIME = 5

FINISHED = 'finished'
RPTFILECHSIZE = """
        
Report file for {:s}
changed size; killing process . . .

"""

MESGS = {DELFILE:'\n\nDeleting {} . . .\n\n',
         INTERP:'\n\nInterpolating {:s} . . .\n\n',
         SLEEP:'\nSleeping 2 seconds . . .\n\n',
         MSGDRIVER:'\n\nWriting driver file name to stdin . . .\n\n',
         MSGRETCHAR:'\n\nWriting retchar to stdin for {:s} . . .\n\n',
         FINISHED1:'\n\nFinished {:s}\n\n',
         FINISHEDALL:'\n\nFinished interpolation.\n\n',
         TIMEELAPSED:'\n\n{:d} elapsed seconds\n\n',
         FILEEXISTS:'\n\nReport file for {:s} exists . . .\n\n',
         DATSIZE:'\n\nData file size for {:s} is now {:d}MB . . .\n\n',
         DONE:'\n\n{:d} out of {:d} assays are finished . . .\n\n',
         DATFILEEXIST:"\n\n{:s} doesn't exist yet . . .\n\n",
         SIZECHANGE:RPTFILECHSIZE}


def cleanslate():
    """
    Delete all output files prior to interpolation
    so that their existence can be tracked.
    """
    for key in ASSAYS:
        files = (ASSAYS[key][DATFILE],
                 ASSAYS[key][RPTFILE],
                 ASSAYS[key][OUTPUT])
        for filex in files:
            print(MESGS[DELFILE].format(filex))
            if os.path.exists(filex) and os.path.isfile(filex):
                os.remove(filex)
    return 0


def startprocess(assay):
    """
    Start <consultant program> run for given interpolation.

    Return subprocess.Popen object,
    file object (output file).
    """
    print(MESGS[INTERP].format(ASSAYS[assay][NAME]))
    # XXX - I hate time.sleep - hack
    # XXX - try to re-route standard output so that
    #       it's not all jumbled together.
    print(MESGS[SLEEP])
    time.sleep(2)
    # output file for stdout
    f = open(ASSAYS[assay][OUTPUT], 'w')
    procx = subx.Popen('{0}'.format(PROGRAM), stdin=subx.PIPE, stdout=f)
    print(MESGS[SLEEP])
    time.sleep(2)
    # XXX - problem, starting up Excel CBT 22JUN2014
    #       Ah - this is what happens when the <software usb licence>
    #            key is not attached :-(
    print(MESGS[MSGDRIVER])
    print('\ndriver file = {:s}\n'.format(ASSAYS[assay][DRFILE]))
    procx.stdin.write(ASSAYS[assay][DRFILE])
    print(MESGS[SLEEP])
    time.sleep(2)
    # XXX - this is so jacked up -
    #       no idea what is happening when
    print(MESGS[MSGRETCHAR].format(ASSAYS[assay][NAME]))
    procx.stdin.write(RETCHAR)
    print(MESGS[SLEEP])
    time.sleep(2)
    print(MESGS[MSGRETCHAR].format(ASSAYS[assay][NAME]))
    procx.stdin.write(RETCHAR)
    print(MESGS[SLEEP])
    time.sleep(2)
    return procx, f


def crosslookup(assay):
    """
    From assay string, get numeric
    key for ASSAYS dictionary.

    Returns integer.
    """
    for key in ASSAYS:
        if assay == ASSAYS[key][NAME]:
            return key
    return 0


def checkprocess(assay, assaydict):
    """
    Check to see if assay
    interpolation is finished.

    assay is the item in question
    (ASSAY1, ASSAY2, etc.).

    assaydict is the operating dictionary
    for the assay in question.

    Returns True if finished.
    """
    # Report file indicates process finished.
    assaykey = crosslookup(assay)
    rptfile = ASSAYS[assaykey][RPTFILE]
    datfile = ASSAYS[assaykey][DATFILE]
    if os.path.exists(datfile) and os.path.isfile(datfile):
        # Report size of file in MB.
        datfilesize = os.stat(datfile).st_size >> BITSHIFT
        print(MESGS[DATSIZE].format(assay, datfilesize))
    else:
        # Doesn't exist yet.
        print(MESGS[DATFILEEXIST].format(datfile))
    if os.path.exists(rptfile) and os.path.isfile(rptfile):
        # XXX - not the most efficient way,
        #       but this checking the file appears
        #       to work best.
        f = open(rptfile, 'r')
        txt = f.read()
        f.close()
        # XXX - hack - gah.
        if txt.find(ENDTEXT) > -1:
            # looking for change in reportfile size
            # or big report file
            print(MESGS[SIZECHANGE].format(assay))
            print(MESGS[SLEEP])
            time.sleep(2)
            return True
    return False


PROCX = 'process'
OUTPUTFILE = 'output file'


# Keeps track of files and progress of <consultant program>.
opdict = colx.OrderedDict()


# get rid of preexisting files
cleanslate()


# start all five roughly in parallel
# ASSAYS keys are numbers
for key in ASSAYS:
    # opdict - ordered with assay names as keys
    namex = ASSAYS[key][NAME]
    opdict[namex] = {}
    assaydict = opdict[namex]
    assaydict[PROCX], assaydict[OUTPUTFILE] = startprocess(key)
    # Initialize active status of process.
    assaydict[FINISHED] = False


# For count.
numassays = len(ASSAYS)
# Loop until all finished.
while True:
    # Cycle until done then break.
    # Sleep SLEEPTIME seconds at a time and check between.
    time.sleep(SLEEPTIME)
    # Count.
    i = 0
    for key in opdict:
        assaydict = opdict[key]
        if not assaydict[FINISHED]:
            status = checkprocess(key, assaydict)
            if status:
                # kill process when report file changes
                opdict[key][PROCX].kill()
                assaydict[FINISHED] = True
                i += 1
        else:
            i += 1
    print(MESGS[DONE].format(i, numassays))
    # all done
    if i == numassays:
        break


print('\n\nFinished interpolation.\n\n')
timeend = dt.now()
elapsed = timeend - timestart


print(MESGS[TIMEELAPSED].format(elapsed.seconds))
print('\n\n{:d} elapsed minutes\n\n'.format(elapsed.seconds/60))


# Allow quick look at screen.
time.sleep(10)



2 comments:

  1. I also have to deal a lot with running Windows executables in batch / parallel. For me it's typically running old compiled Fortran-based analysis codes across a range of inputs. I'm often wrapping these tools or (workflows involving several tools) into a GUI that streamlines things. This lead me to using PyQt and then PySide (more openly licensed Python Qt bindings). The great thing I discovered was that Qt's QProcess object can perform asynchronous interaction with executables. There are events/signals that get triggered like QProcess.readyReadStandardOutput or QProcess.readyReadStandardError that allow you to respond to specific queries asked by the executable. In the callback/slot connected to the readyReadStandardOutput signal you specify what to do in response, which can be to write text to the process. Like I check for "Press to continue" then automatically write a Return character to the process. My codes often output lines to the command window indicating progress, which I parse into a percentage and use to update a progress bar, and possibly a countdown timer. There is also a signal for when the process is finished so that you can perform cleanup or post-processing on the Python side. I've enjoyed PySide's signals and slots enough that I've even used it without GUI's to write asynchronous / event-based command line stuff. In fact, I've found it's best if your automation code works independent of the GUI, and then the start/stop/progress/finished signals can be connected to the GUI later without affecting functionality.

    You may not have the ability to install extra stuff in your environment. If not, since you're on Python3, perhaps the new asyncio module in 3.4 can interact with processes? I'm sure there are other libraries lighter than Qt than can handle reading the stdout pipe and writing to stdin interactively. Here's some interesting discussion which shows a wide range of solutions: http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python?lq=1. Some of the later answers on there are library implementations of the top answer.

    ReplyDelete
  2. @Brett Thank you. Those suggestions are really good. If I need to improve on this, I'll take your input and investigate it. I know from reading (mostly job announcements) that PyQt gets used a bit in scientific apps that require a GUI. Right now, the hack I presented gets the job done, but it definitely could use improvement. CBT

    ReplyDelete