Sunday, July 10, 2016

Using Generators and Coroutines to Merge Tabular Data (Drill Holes)

I have some mining drill hole data that I need to merge into an old vendor FORTRAN input format.  Basically I do a series of SQL pulls from the drillhole database to csv files, then merge the data.  My methodology has been a bit brute force in matching the separate parts of the drill hole data (lists, opening and closing of files to find matching holes, etc.).  My thought was that I could do this more elegantly and efficiently by iterating through the files with generators.

The ability of generators to communicate with each other via the send() method intrigued me.  I had always been a bit shy about using this language feature.  My csv problem gave me a justification for checking it out.

The reference I used was Dr. Dave Beazley's 2009 Pycon Tutorial.  He does a nice job of explaining things as well as dispatching good advice.  (I disobeyed the good advice in the interest of shoehorning coroutines into my solution; I'll cover this below.)  Beazley defines a coroutine in the sense of generators and the "yield" keyword as generators where "yield" is used more generally.  That is the context I'm using the word "coroutine" in this post.

Given my problem of a one (drill hole start survey) to many (drill hole interval values) relationship, I attempted a very simple (perhaps oversimplified) toy program demo of what I wanted to do with real data:

def coroutinex(subgenerator):
    """
    Generator function that consumes
    a key value sent from a higher
    level generator.  This generator
    yields two tuples of the form
    (<boolean>, data).  The boolean
    value indicates whether the key
    matches the data.

    Returns a generator.
    """
    while True:
        # One entry point for send()/reset.
        keyx = yield
        subdatatop = next(subgenerator)
        if subdatatop[0] == keyx:
            yield (True, subdatatop)
            for subdataloop in subgenerator:
                if subdataloop[0] == keyx:
                    yield (True, subdataloop)
                else:
                    yield (False, subdataloop)
                    break
 

def toplevelgen(topleveliter, coroutinex):
    """
    Top level generator function.

    subgenerator is a generator
    that this generator sends
    a key value to.  The
    subgenerator yields a two
    tuple that communicates if
    the key matches or not.

    Returns a generator.
    """
    # Get sub generator/coroutine initialized.
    coroutinex.send(None)
    # Variable for dealing with return
    # from sub-generator/coroutine.
    subvalue = False
    for keyx in topleveliter:
        yield keyx
        if subvalue:
            yield subvalue
        subvalue = coroutinex.send(keyx)
        # Get sub generator/coroutine re-initialized
        # after send() reset.
        if subvalue is None:
            # XXX - hack
            subvalue = coroutinex.send(keyx)
        yield subvalue
        for submessage in coroutinex:
            # XXX - another hack to deal with yield of None.
            if not submessage:
                continue
            subvalue = submessage
            # if submessage[0] is True, kick it out.
            if submessage[0]:
                yield submessage
            else:
                # Keep subvalue for after keyvalue
                # yield at top.
                break

topleveliter = range(44, 55)
keysx = [44, 44, 44, 45, 45, 45, 45, 45,
         46, 46, 46, 46, 46, 46, 46, 46,
         47, 47, 47, 48, 48, 48, 48, 48,
         49, 49, 49, 49, 49, 49, 50, 50,
         51, 51, 51, 51, 51, 51, 51, 51,
         52, 52, 52, 52, 52, 52, 52, 52,
         53, 53, 53, 53, 53, 53, 53, 53,
         54, 54, 54, 54, 54, 54, 54, 54]

sequencex = range(1, len(keysx) + 1)
subgenerator = zip(keysx, sequencex)

gensub = coroutinex(subgenerator)
genmain = toplevelgen(topleveliter, gensub)

for x in genmain:
     print(x)



Output:

44
(True, (44, 1))
(True, (44, 2))
(True, (44, 3))
45
(False, (45, 4))
(True, (45, 5))
(True, (45, 6))
(True, (45, 7))
(True, (45, 8))
46
(False, (46, 9))
(True, (46, 10))
(True, (46, 11))
(True, (46, 12))
(True, (46, 13))
(True, (46, 14))
(True, (46, 15))
(True, (46, 16))
47
(False, (47, 17))
(True, (47, 18))
(True, (47, 19))
48
(False, (48, 20))
(True, (48, 21))
(True, (48, 22))
(True, (48, 23))
(True, (48, 24))
49
(False, (49, 25))
(True, (49, 26))
(True, (49, 27))
(True, (49, 28))
(True, (49, 29))
(True, (49, 30))
50
(False, (50, 31))
(True, (50, 32))
51
(False, (51, 33))
(True, (51, 34))
(True, (51, 35))
(True, (51, 36))
(True, (51, 37))
(True, (51, 38))
(True, (51, 39))
(True, (51, 40))
52
(False, (52, 41))
(True, (52, 42))
(True, (52, 43))
(True, (52, 44))
(True, (52, 45))
(True, (52, 46))
(True, (52, 47))
(True, (52, 48))
53
(False, (53, 49))
(True, (53, 50))
(True, (53, 51))
(True, (53, 52))
(True, (53, 53))
(True, (53, 54))
(True, (53, 55))
(True, (53, 56))
54
(False, (54, 57))
(True, (54, 58))
(True, (54, 59))
(True, (54, 60))
(True, (54, 61))
(True, (54, 62))
(True, (54, 63))
(True, (54, 64))


Back to Dr. Beazley's advice - he doesn't recommend this - even though "yield" is the keyword, it means two different things in two different contexts.  Do not mix generator and coroutine functionality.  I'm going ahead in this post and doing it anyway.  I don't have an excuse.  It does remind me of some old Bob Dylan lyrics:

Now the rainman gave me two cures
Then he said, "Jump right in"
The one was Texas medicine
The other was just railroad gin
An' like a fool I mixed them
An' it strangled up my mind


It's OK, Bob, some of us just need to learn things the hard way.
 
Onward.

A brief diversion on drill holes - the data for a small scale (about 2,000 feet or less) geotechnical or gelogic drill hole come back in three parts:

1) collar - where the hole starts in space (coordinates).


2) surveys - where the hole ends up going in space relative to the collar (drill pipe has proven to be amazingly flexible when passing through rock).

3) assays - usually the hole is sampled along intervals and chemically or physically analyzed.  The assay intervals may or may not coincide with survey intervals.

Clear as (drilling) mud?  Great - back to Python.

The problem:

Three tabular csv dumps from SQL - a collar file, a survey file, and an assay file.  Each has a unique key in the first column that matches across files (the drill hole key).  On the SQL side I have ensured that there are no orphan key rows in any of the three files and that all three are sorted on the key.

I present the sanitized ouput here first - it will give some context to the domain specific parts of the code:

XXXXX,XXXXXX.XXXX,XXXXXXX.XXXX,XXXX.XXXX,0.0000,0.0000,26.4529
XXXXX,0.0000,1.1925,1.1925,283.5688,-13.5310 
XXXXX,1.1925,4.2760,3.0836,284.6224,1.9328      SURVEYS
XXXXX,4.2760,6.3799,2.1039,280.2829,-3.1334       GO
XXXXX,6.3799,9.7024,3.3225,282.5794,2.3632       HERE
XXXXX,9.7024,11.8701,2.1677,285.4406,-1.1631     AFTER
XXXXX,11.8701,13.6920,1.8219,275.9462,-5.0698    COLLAR
XXXXX,13.6920,17.1199,3.4279,285.4561,1.9560    LOCATION
XXXXX,17.1199,19.6944,2.5746,279.2318,-0.7344
XXXXX,19.6944,22.5857,2.8913,282.1947,4.3241
XXXXX,22.5857,24.1879,1.6022,283.8367,-1.7525
XXXXX,24.1879,26.4529,2.2650,287.3820,13.4805
XXXXX                             <----- LEGACY DRILLHOLE NUMBER
XXXXX,X.XXXX,X.XXXX,X.XXXX,X.XX,X.XX,X.XX, etc.
XXXXX,X.XXXX,X.XXXX,X.XXXX,X.XX,X.XX,X.XX, etc.
XXXXX,X.XXXX,X.XXXX,X.XXXX,X.XX,X.XX,X.XX, etc.            ASSAYS
XXXXX,X.XXXX,X.XXXX,X.XXXX,X.XX,X.XX,X.XX, etc.              GO
XXXXX,X.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX, etc.            HERE
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
XXXXX,XX.XXXX,XX.XXXX,X.XXXX,X.XX,X.XX,X.XX,XX.XX, etc.
                                                <----- BLANK LINE
XXXXXX,XXXXXX.XXXX,XXXXXXX.XXXX,XXXX.XXXX,0.0000,0.0000,23.5411
XXXXXX,0.0000,2.5781,2.5781,135.0157,2.3341
XXXXXX,2.5781,5.0351,2.4570,137.1873,5.5353
XXXXXX,5.0351,7.3706,2.3354,135.2276,7.7020
XXXXXX,7.3706,9.9168,2.5462,136.4253,6.4493
                .
                .
                .
                .
                .
                .
                .
               etc.


And the code (sorry about the size - it got messier than I would have hoped):

#!C:\Python35\python

"""
Parse collar, survey, and assay dumps for
trenches from vendor drill hole RDBMS.

Write specially formatted data file for
consumption by old vendor FORTRAN
routine 201.
"""

import csv
from collections import namedtuple
from collections import OrderedDict

COLLAR = './data/collar.csv'
SURVEY = './data/survey.csv'
ASSAYS = './data/assays.csv'
DAT201 = './data/TR.dat'

# collar (ssit) fields
ID = 'drillholeid'
NAME = 'drillholename'
DATE = 'drillholedate'
LEGACY = 'drillholehistoricname'
X = 'collarx'
Y = 'collary'
Z = 'collarz'
AZ = 'azimuth'
DIP = 'dip'
LEN = 'drillholelength'

COLLARFIELDS = [ID, NAME, DATE, LEGACY, X, Y, Z,
                AZ, DIP, LEN]

# survey fields
FROM = 'fromx'
TO = 'depthto'
SAMPLEN = 'surveylength'
AZ = 'azimuth'
DIP = 'dip'

SURVEYFIELDS = [ID, NAME, DATE, LEGACY, FROM, TO,
                SAMPLEN, AZ, DIP]

# assay fields
AFROM = 'assayfrom'
ATO = 'assayto'
AI = 'assayinterval'
ASSAY1 = 'assay1'
ASSAY2 = 'assay2'
ASSAY3 = 'assay3'
ASSAY4 = 'assay4'
ASSAY5 = 'assay5'
ASSAY6 = 'assay6'
ASSAY7 = 'assay7'
ASSAY8 = 'assay8'

ASSAYFIELDS = [ID, NAME, LEGACY, AFROM, ATO, AI, ASSAY1,
               ASSAY2, ASSAY3, ASSAY4, ASSAY5, ASSAY6, ASSAY7, ASSAY8]

ASSAYFORMAT = '.2f'
SURVEYFORMAT = '.4f'

COMMA = ','

# Output for 201 file format.
# Collars.
COLOUTPUTCOLS = [X, Y, Z, AZ, DIP, LEN]
COLFMTOUTPUT = [(attribx, SURVEYFORMAT) for attribx in COLOUTPUTCOLS]
# Surveys.
SURVOUTPUTCOLS = [FROM, TO, SAMPLEN, AZ, DIP]
SURVFMTOUTPUT = [(attribx, SURVEYFORMAT) for attribx in SURVOUTPUTCOLS]
# Assays.
ASSYOUTPUTCOLS = [AFROM, ATO, AI, ASSAY1, ASSAY2, ASSAY3, ASSAY4, ASSAY5,
                  ASSAY6, ASSAY7, ASSAY8]
ASSYOUTPUTFMTS = 3 * [SURVEYFORMAT] + 8 * [ASSAYFORMAT]
# Have to use this repeatedly - hence list.
ASSYFMTOUTPUT = list(zip(ASSYOUTPUTCOLS, ASSYOUTPUTFMTS))

RETCHAR = '\n'

# For tracking which dataset we're
# dealing with.
SURVEYSUBDATA = 'survey'
ASSAYSUBDATA = 'assay'

# For survey/assay dictionary.
COR = 'coroutine'
FMT = 'format'
LAST = 'lastvalue'
END = 'end'

INFOMESSAGE = 'Now doing hole number {0} . . .'

def makecsvdatagenerator(csvrdr, ntname, ntfields):
    """
    Returns a generator that yields csv
    row records as named tuple objects.

    csvrdr is the csv.reader object.

    ntname is the name given to the
    collections.namedtuple object.

    ntfields is the list of field names
    for the collections.namedtuple object.
    """
    namedtup = namedtuple(ntname, ntfields)
    return (namedtup(*linex) for linex in csvrdr)

def formatassay(numstring, formatx):
    """
    Returns a string representing a float
    that typically is in 0.00 format, but
    other float formats can be applied.

    numstring is a string representing a float.

    formatx is the desired format (Python 3 format string).
    """
    return(format(float(numstring), formatx))

def getnumericstrings(record, formats):
    """
    Returns list of strings.

    record is a collections.namedtuple instance.

    formats is a list of two-lists of namedtuple
    attributes and numeric string formats to be
    applied to each attribute's value.
    """
    return [formatassay(record.__getattribute__(pairx[0]),
                                                pairx[1])
            for pairx in formats]

def coroutinex(subgenerator):
    """
    Generator function.
 
    Consumes key value and yields
    two tuple of (<boolean>,
    next(subgenerator)) in response.
    boolean value indicates
    whether key matches first
    value of subgenerator namedtuple.

    subgenerator is a generator of
    namedtuples.

    Returns a generator.
    """
    while True:
        keyx = yield
        subdatatop = next(subgenerator)
        if subdatatop.drillholeid == keyx:
            yield (True, subdatatop)
            for subdataloop in subgenerator:
                if subdataloop.drillholeid == keyx:
                    yield (True, subdataloop)
                else:
                    yield (False, subdataloop)
                    break
        # Case where only one interval in
        # drill hole.
        else:
            yield (False, subdatatop)

def formatdataline(record, formats):
    """
    Prepare record as a line
    of text for write to file.

    record is a collections.namedtuple
    object.

    formats is a list of two tuples of
    namedtuple attributes and numeric
    string formats.

    Returns string.
    """
    recordline = [record.drillholehistoricname]
    recordline.extend(getnumericstrings(record,
                                        formats))
    return COMMA.join(recordline) + RETCHAR

def dealwithsend(subgen, sendval):
    """
    Helper function to clean up code.
    Deals with initial receipt of
    None value upon send() and
    re-sends value.

    Sends value sendval to
    generator/coroutine subgen.

    Returns two tuple of (<boolean>,
    <collections.namedtuple>).
    """
    retval = subgen.send(sendval)
    if retval is None:
        retval = subgen.send(sendval)
    return retval

def dealwithyieldrecord(survassay, subdata):
    """
    Helper function to clean up code.

    Formats values for write to file.

    survassay is a dictionary of values.

    subdata is the dictionary key that
    tells which data is being handled
    (survey or assay).
    """
    return formatdataline(survassay[subdata][LAST][1],
                          survassay[subdata][FMT])

def cyclecollars(collargen,
                 survassay):
    """
    Generator function that yields
    data (strings) for write to a
    a specially formatted drill hole
    file.

    This is the top level generator
    for working the merging of
    drillhole data (collars, surveys,
    assays).

    survassay is a collections.OrderedDict
    object that references the respective
    survey and assay generators and holds
    information for tracking which subset
    of data (surveys or assays) are being
    worked.
    """
    for record in collargen:
        keyx = record.drillholeid
        label = record.drillholehistoricname
        survassay[SURVEYSUBDATA][END] = label + RETCHAR
        print(INFOMESSAGE.format(label))
        yield formatdataline(record, COLFMTOUTPUT)
        for subdata in survassay:
            fmt = survassay[subdata][FMT]
            if survassay[subdata][LAST]:
                yield dealwithyieldrecord(survassay, subdata)
            subvalue = dealwithsend(survassay[subdata][COR], keyx)
            # Case where only one interval.
            if not subvalue[0]:
                survassay[subdata][LAST] = subvalue
                yield survassay[subdata][END]
                continue
            yield formatdataline(subvalue[1], fmt)
            for submessage in survassay[subdata][COR]:
                # End of iteration.
                if submessage is None:
                    yield survassay[subdata][END]
                    break
                if submessage[0]:
                    yield formatdataline(submessage[1], fmt)
                else:
                    survassay[subdata][LAST] = submessage
                    yield survassay[subdata][END]
                    break

def main():
    """
    Parse csv dumps from SQL and write
    drillhole data fields for import
    to old vendor FORTRAN based binary
    files.

    Side effect function.
    """
    with open(COLLAR, 'r') as colx:
        colcsv = csv.reader(colx)
        collargen = makecsvdatagenerator(colcsv,
                                         'collars',
                                         COLLARFIELDS)
        with open(SURVEY, 'r') as svgx:
            survcsv = csv.reader(svgx)
            survgen = makecsvdatagenerator(survcsv,
                                           'surveys',
                                           SURVEYFIELDS)
            surveycoroutinex = coroutinex(survgen)
            with open(ASSAYS, 'r') as assx:
                assycsv = csv.reader(assx)
                assygen = makecsvdatagenerator(assycsv,
                                               'assays',
                                               ASSAYFIELDS)
                assaycoroutinex = coroutinex(assygen)
                with open(DAT201, 'w') as d201:
                    # Get sub generators/coroutines initialized.
                    surveycoroutinex.send(None)
                    assaycoroutinex.send(None)
                    surveyassay = OrderedDict()
                    surveyassay[SURVEYSUBDATA] = {COR:surveycoroutinex,
                                                  FMT:SURVFMTOUTPUT,
                                                  LAST:None,
                                                  END:None}
                    surveyassay[ASSAYSUBDATA] = {COR:assaycoroutinex,
                                                 FMT:ASSYFMTOUTPUT,
                                                 LAST:None,
                                                 END:RETCHAR}
                    colgenx = cyclecollars(collargen,
                                           surveyassay)
                    for linex in colgenx:
                        d201.write(linex)
    print('Done')

if __name__ == '__main__':
    main()
 


The bad news: this was more difficult with a real world dataset than I anticipated.  Beazley's admonition was an apt one.
 
The good news:  it does perform better than my previous brute force implementations.  From the standpoint of iterating through datasets and not wasting resources (even with the polling or interrupting or whatever facilitates the generator communication closer to the metal), this is a better implementation.  Also, I learned a bit more about the "yield" keyword.

Thanks for stopping by.