Project Flow

Project Flow is intended to allow a user to flow easily past the different stages of software complexity. A common situation for an academic or a research software engineer (RES) to find themselves in is that they wrote a quick script to answer a specific quesiton, but it turned out to be useful in other context. This can lead to a the script grows and grows until complexity hurts its usefulness. A software developer would then think “oops, I should really make this modular.” ProjectFlow provides several modalities useful to researchers ranging from simple drop-in solution to complex scripting framework. To do this, ProjectFlow manages folders and defines a tree of tasks that can easily be run in parallel where needed and keeping track of task-dependencies. ProjectFlow borrows heavily in concept (though not in code) from the task_graph library produced by Rich Sharp but adds a predefined file structure suited to research and exploration tasks.

Project Flow is intended to flow easily from the situation where you have coded a script that grows and grows until you think “oops, I should really make this modular.” Thus, it has several modalities useful to researchers ranging from simple drop-in solution to complex scripting framework. Similar to the Stages of Complexity page, we will introduce progressively more complex usages, starting with the simples examples and working towards distributed parallelization on complex task-trees. If you would like to skip the stages and go straight to a full-functioning example, go to Standard ProjectFlow run file.

Starter code

We will use the third level of complexity from the previous example as the launching point for how to use ProjectFlow. The code below adds a few additional steps we will work with.

import os
import numpy as np
import gdal

def raster_to_array(raster_input_path):
    ds = gdal.Open(yield_per_hectare_raster_path_1)
    print("Reading " + raster_input_path +'. This might take a while!')
    array = ds.ReadAsArray()
    return array

# Load the raster
raster_input_path = 'data/yield.tif' # TODOO[make all these actually point to data in hazelbean]
array = raster_to_array(raster_input_path)

# Set no data values equal to zero

# Method 1 (creates new array)
array_ndv_fix = np.where(array == -9999, 0, array)

# Method 2 (inplace)
array[array == -9999] = 0

# Sum the raster
sum = np.sum(array_ndv_fix)

# Calculate the average value on >0 cells

## First create a binary map of where there is positive value
non_zero = np.where(array_ndv_fix > 0, 1, 0) 

## Count those
n_non_zero = np.sum(non_zero)

## Calculate the average
mean = sum / n_non_zero

## Write the value to a file
with open(output_path, rw) as f: 
    print('Write here.')

print('Sums of layers: ' + str(mean))

What is the ProjectFlow and the Task Tree?

ProjectFlow is a Python class that manages a project’s file structure and task tree. It is designed to be a drop-in solution for researchers who have a script that has grown too large and complex to manage. The ProjectFlow object is initialized with a directory, and it will create a file structure within that directory that is designed to be easy to navigate and understand. The ProjectFlow object also manages a task tree, which is a tree of tasks that need to be run in order to complete the project. The task tree is defined by the user, and the ProjectFlow object will manage the execution of the tasks, ensuring that they are run in the correct order and that any dependencies between tasks are satisfied.

To create a task tree NOT FINISHED. Just look at the example.py in docs/examples

Step 1: Substitute in a few pre-built Hazelbean functions

import hazelbean as hb

if __name__ == '__main__':
    p = hb.ProjectFlow(r'C:\Files\Research\cge\gtap_invest\projects\feedback_policies_and_tipping_points')

In a multi-file setup, in the run.py you will need to import different scripts, such as main.py i.e.:

import visualizations.main

The script file mainpy can have whatever code, but in particular can include “task” functions. A task function, shown below, takes only p as an agrument and returns p (potentially modified). It also must have a conditional (if p.run_this:) to specify what always runs (and is assumed to run trivially fast, i.e., to specify file paths) just by nature of having it in the task tree and what is run only conditionally (based on the task.run attribute, or optionally based on satisfying a completed function.)

def example_task_function(p):
    """Fast function that creates several tiny geotiffs of gaussian-like kernels for later use in ffn_convolve."""

    if p.run_this:
        for i in computationally_intensive_loop:
            print(i)

Important Non-Obvious Note

Importing the script will define function(s) to add “tasks”, which take the ProjectFlow object as an argument and returns it after potential modification.

def add_all_tasks_to_task_tree(p):
    p.generated_kernels_task = p.add_task(example_task_function)

Iterating over many model assumptions

In many case, such as a standard GTAPPy run, we will iterate over different aggergations and scenarios (now renamed counterfactuals). This is done as expected with code like this:


for aggregation_label in p.aggregation_labels:
     
    for experiment_label in p.experiment_labels:
        
        for n_years_counter, ending_year in enumerate(p.years):
            
            if n_years_counter == 0:
                starting_year = p.base_year
            else:
                starting_year = p.years[n_years_counter - 1]
                
            output_dir = p.get_path(os.path.join(aggregation_label, experiment_label, str(ending_year)))

But sometimes, this becomes DEEPLY nested and confusing. SEALS implements an API that reads these nested layers from a CSV. This is defined more fully in the SEALS user guide.


for index, row in p.scenarios_df.iterrows():
    seals_utils.assign_df_row_to_object_attributes(p, row)
    
    if p.scenario_type != 'baseline':
                            
        for n_years_counter, ending_year in enumerate(p.years):

            if n_years_counter == 0:
                starting_year = p.base_year
            else:
                starting_year = p.years[n_years_counter - 1]
                
            current_run_dirs = os.path.join(p.exogenous_label, p.climate_label, p.model_label, p.counterfactual_label)
            output_dir = p.get_path(current_run_dirs, str(ending_year))
            expected_sl4_path = os.path.join(output_dir, p.counterfactual_label + '_Y' + str(ending_year) + '.sl4')

In this scenarios_df, which was loaded from scenarios_csv_path, there are multiple nested for loops implied, for p.exogenous_label, p.climate_label, p.model_label, p.counterfactual_label, each row has a unique value that would have been iterated over with the for loop above. Now, however, we are iterating just over scenario_df rows. Within each row pass, a project-level attribute is assigned via seals_utils.assign_df_row_to_object_attributes(p, row). This is used instead of the nested for loop.

Creating scenarios spreadsheets

Here. Explain why it writes the scenarios_csv FROM CODE rather than downloading it (keeps it up to date as code changes quickly). However, this gets convoluted when you also have to initialize the attributes before you write?!?


    # If you want to run SEALS with the run.py file in a different directory (ie in the project dir)
    # then you need to add the path to the seals directory to the system path.
    custom_seals_path = None
    if custom_seals_path is not None: # G:/My Drive/Files/Research/seals/seals_dev/seals
        sys.path.insert(0, custom_seals_path)

    # SEALS will run based on the scenarios defined in a scenario_definitions.csv
    # If you have not run SEALS before, SEALS will generate it in your project's input_dir.
    # A useful way to get started is to to run SEALS on the test data without modification
    # and then edit the scenario_definitions.csv to your project needs.
    # Some of the other test files use different scenario definition csvs 
    # to illustrate the technique. If you point to one of these 
    # (or any one CSV that already exists), SEALS will not generate a new one.
    # The avalable example files in the default_inputs include:
    # - test_three_scenario_defininitions.csv
    # - test_scenario_defininitions_multi_coeffs.csvs
    
    p.scenario_definitions_path = os.path.join(p.input_dir, 'scenario_defininitions.csv')

    # Set defaults and generate the scenario_definitions.csv if it doesn't exist.
    if not hb.path_exists(p.scenario_definitions_path):
        # There are several possibilities for what you might want to set as the default.
        # Choose accordingly by uncommenting your desired one. The set of
        # supported options are
        # - set_attributes_to_dynamic_default (primary one)
        # - set_attributes_to_dynamic_many_year_default
        # - set_attributes_to_default # Deprecated

        gtap_invest_utils.set_attributes_to_dynamic_gtap_default(p) # Default option


        # # Optional overrides for us in intitla scenarios
        # p.aoi = 'RWA'

        # gtap_invest_utils.set_attributes_to_dynamic_default(p)
        # Once the attributes are set, generate the scenarios csv and put it in the input_dir.
        gtap_invest_utils.generate_gtap_invest_scenarios_csv_and_put_in_input_dir(p)
        p.scenarios_df = pd.read_csv(p.scenario_definitions_path)
    else:
        # Read in the scenarios csv and assign the first row to the attributes of this object (in order to setup additional 
        # project attributes like the resolutions of the fine scale and coarse scale data)
        p.scenarios_df = pd.read_csv(p.scenario_definitions_path)

        # Because we've only read the scenarios file, set the attributes
        # to what is in the first row.
        for index, row in p.scenarios_df.iterrows():
            seals_utils.assign_df_row_to_object_attributes(p, row)
            break # Just get first for initialization.

Task Format

Project flow requires a consistent format for tasks. The following is an example of a task that creates a correspondence file from gtap11 regions to gtapaez11 regions. The task itself defined as a function that takes a p object as an argument. This p object is a ProjectFlow object that contains all the project-level variables, manages folders and files, and manages tasks and parallelization. p also includes documentation, which will be written directly into the task directory.

Also note that any project-level attribute defined in between the function start and the if p.run_this: component are the “project level variables” that are fair-game for use in other tasks. These paths are critical for high performance because they enable quick-skipping of completed tasks and determiniation of which parts of the task tree need rerunning.

Tasks should be named as a noun (this breaks Python pep8 style) referencing what will be stored in the tasks output dir. This might feel awkward at first, but it means that the resultant file structure is easier to interpret by a non-EE outsider.


def gtap_aez_seals_correspondences(p):
    p.current_task_documentation = """
    Create correspondence CSVs from ISO3 countries to GTAPv11 160
    regions, and then to gtapaezv11 50ish regions, also put the classification
    for seals simplification and luh.  
    """
    p.gtap11_region_correspondence_input_path = os.path.join(p.base_data_dir, 'gtappy', 'aggregation_mappings', 'GTAP-ctry2reg.xlsx')
    p.gtap11_region_names_path = os.path.join(p.base_data_dir, 'gtappy', 'aggregation_mappings', 'gtap11_region_names.csv')
    p.gtap11_gtapaez11_region_correspondence_path = os.path.join(p.base_data_dir, 'gtappy', 'aggregation_mappings', 'gtap11_gtapaez11_region_correspondance.csv')    

    if p.run_this:
        
        "logic here"

Automatic Directory Organization via Tasks

Hazelbean automatically defines directory organization as a function of the task tree. When the ProjectFlow object is created, it takes a directory as its only required input. This directory defines the root of the project. The other directory that needs to be referenced is the base_data_dir. When you initialize the p object, it notes this:

Created ProjectFlow object at C:\Users\jajohns\Files\gtap_invest\projects\cwon from script C:\Users\jajohns\Files\gtap_invest\gtap_invest_dev\gtap_invest\run_cwon.py with base_data set at C:\Users\jajohns\Files/base_data

In the run file, the following line generates the task tree:

gtap_invest_initialize_project.build_extract_and_run_aez_seals_task_tree(p)

Which points to a builder function in the initialize file, looking something like this:

This would generate the following task tree:

Two notations are especially useful within this task tree.

  1. Within the function that defines a task, p.cur_dir points to the directory of that task. So for instance, the last task defined in the image above, in its code, you could reference p.cur_dir, and it would point to <project_root>/econ_visualization/econ_lcovercom
  2. Outside of a given function’s code, you can still refer to paths that were defined from within the functions code, but now (because you are outside the function) it is given a new reference. Using the example above, you could reference the same directory with p.econ_lcovercom_dir where the p attribute is named exactly as <function_name>_dir

All of this setup enable another useful feature: automatic management of file generation, storage and downloading. This is done via the hazelbean function:


useful_path = hb.get_path(relative_path)

This function will iteratively search multiple locations and return the most “useful” one. By default, the relative_path variable will first joined with the p.cur_dir. If the file exists, it returns it. If not, it checks the next location, which is p.input_dir, and then p.base_data_dir. If it doesn’t find it anywhere, it will attempt to download it from google cloud (NYI) and save it in the p.cur_dir. If it is not available to download on google cloud, then it treats the path as something we will be generating within the task, and thus, get_path returns the first option above, namely joining the relative_path with p.cur_dir.

One important use-case that needs explaining is for tasks that generate files that will eventually be placed in the base_data_dir. The goal is to enable easy generation of it to the intermediate directory in the appropriate task_dir, but then have the ability to copy the files with the exact same relative path to the base_data_dir and have it still be found by p.get_path(). To do this, you will want to choose a path name relative to the tasks’ cur_dir that matches the desired directory relative to the base data dir. So, for example, we include 'gtappy', 'aggregation_mappings' at the beginning of the relative path for in the intermediate directory in the appropriate task_dir, but then we also will want to copy the files with the exact same relative path to the base_data_dir and have it still be found by p.get_path(). To do this, you will want to choose a path name relative to the tasks’ cur_dir that matches that in the base_data_dir, for example <base_data_dir>/'gtappy/aggregation_mappings/gadm_adm0.gpkg',


template_path = p.get_path(os.path.join('gtappy', 'aggregation_mappings', 'gadm_adm0.gpkg')) 

It can be hard deciding what counts as a base_data_generating task or not, but generally if it is a file that will not be used by other projects, you should not treat it as a base_data_generating task. Instead, you should just make it relative to the cur_dir (or wahtever makes sense), as below:


output_path = p.get_path(os.path.join(aggregation_label + '_' + experiment_label + '_' + header + '_stacked_time_series.csv'))

One additional exception to the above is if you are calling get_path outside of a task/task_tree. One common example is in the run file before you build the task tree. In this case, the default_dirs will not make sense, and so you need to specify it manually as here:


p.countries_iso3_path = p.get_path(os.path.join('cartographic', 'gadm', 'gadm_adm0_10sec.gpkg'), possible_dirs=[p.input_dir, p.base_data_dir])

Validation of files

ProjectFlow is designed to calculate very fast while simultaneously validating that everything is approximately correct. It does this by checking for the existence of files (often combined with hb.get_path()). For example


p.gadm_r263_gtapv7_r251_r160_r50_correspondence_vector_path = p.get_path(os.path.join('gtap_invest', 'region_boundaries', 'gadm_r263_gtapv7_r251_r160_r50_regions.gpkg'))     
if not hb.path_exists(p.gadm_r263_gtapv7_r251_r160_r50_correspondence_vector_path):         
    hb.log('Creating ' + p.gadm_r263_gtapv7_r251_r160_r50_correspondence_vector_path)         
    "computationally expensive thing here."

ProjectFlow very carefully defines whether or not you should run something based on the existence of specific paths. Usually this is just checking for each written path and only executing the code if it’s missing, but in some cases where lots of files are created, it’s possible to take the shortcut of just checking for the existence of the last-created path.

Eliminating redundant calculation across projects

If you have a time consuming task that, or example, writes to


big_file_path = hb.get_path('lulc', 'esa', 'seals7', 'convolutions', '2017', 'convolution_esa_seals7_2017_cropland_gaussian_5.tif' )

In this example, suppose you needed to create this file via your create_convolutions() task or something. When you first do this, it obviously won’t exist yet, so get_path() will join that relative path in the p.cur_dir location. If you run the ProjectFlow again, it will see it’s there and then instantly skip recalcualting it.

In addition to the 5 repos plus the EE repo, there is a managed base data set stored in teh same location

A ProjectFlow object must have a base_data_dir set (I think…). This is because the p.get_path() will look in this folder for it, and/or will download to it.

Python Tips and Conventions

For large files that take a long time to load, use a string->dataframe/dataset substitution as below. Make a LOCAL variable to contain the loaded object, and have that be assigned to the correct project-level path string. In subsequent usages, check type and if it’s still a string, then it hasn’t been loaded yet, so do that. I’m debating making it a project level variable trick


gadm = p.gadm_adm0_vector_input_path    

# Just load it on first pass
if type(gadm) is str:
    gadm = hb.read_vector(p.gadm_adm0_vector_input_path)