Zeta Workflow 04: Exception Handling Walkthrough

The released Zeta Workflow code does not correctly handle exceptions. This walkthrough describes the behavior as modified by myself.

The basic issue was that both start() and resume(), when using the Database Tie-In component, issue beginTransaction() to the database engine. The workflows can throw exceptions which remain uncaught. Because the exception is not caught within the workflow code, the transaction is never closed with either rollback or commit. When using an sqlite database, unit tests then fail, complaining of a locked table.

Within start() and resume(), we now catch the exceptions so that we can commit or roll back the transaction before (possibly) re-throwing the exception.


Start

We cannot allow start() to re-throw the exception. This is because we depend on start() to give us the execution id. If we threw an exception, we can be sure that we did not finish the workflow the whole way through. Therefore, if we are to resume that execution, we need the execution id.

I therefore changed the start() behavior.

If an exception is thrown during start(), start() catches the exception. Start sets the variable ‘exception’ to the exception message and suspends execution. Start then returns the execution id. 

Here is start() in zetacomponents/workflow/src/interfaces/execution.php.

    public function start( $parentId = null )
    {
        if ( $this->workflow === null )
        {
            throw new ezcWorkflowExecutionException(
              'No workflow has been set up for execution.'
            );
        }

        $this->cancelled = false;
        $this->ended     = false;
        $this->resumed   = false;
        $this->suspended = false;

        $this->beginTransaction();
        try {
            $this->doStart( $parentId );
            $this->loadFromVariableHandlers();

            foreach ( $this->plugins as $plugin )
            {
                $plugin->afterExecutionStarted( $this );
            }

            // Start workflow execution by activating the start node.
            $this->workflow->startNode->activate( $this );

            // Continue workflow execution until there are no more
            // activated nodes.
            $this->execute();
        } catch(\Exception $e) {
            $this->setVariable('exception', $e->getMessage());
            if(!($this->isSuspended())) {
                $this->suspend();
            }
        }
        $this->commit();

        // Return execution ID if the workflow has been suspended.
        if ( $this->isSuspended() )
        {
            return (int)$this->id;
        }
    }

beginTransaction(), rollback(), and commit() are empty do-nothing methods in this parent class. When we are using the Database Tie-In package, which we are, those methods issue the corresponding database engine command.

Let’s walk through start().

See line 1. When Zeta Workflow starts a sub workflow, this will contain the id of the parent workflow. When we start the parent workflow, we call start() without any parameter.

Lines 3-8 are a guard clause ensuring we have a workflow to execute.

Lines 10-14 initialize the execution state. We can query the state with isSuspended(), etc. However, from outside the workflow, you can only query the workflow execution at a stopped point. Service Objects, Plugins, and Listeners have access to the workflow while it is actively running.

Line 15 is begins the database transaction. With a non-interactive workflow, it’s a do-nothing function.

Lines 16-36 are our try/catch block. This is a new code structure which we intend to submit to the open source community as a pull request. We cannot allow start() to get away with an uncaught exception, because we need it to return the execution id.

Let’s first look at exception processing, the catch block lines 31-35. We pass back the exception message as the execution variable ‘exception’. This might not be terribly useful in the case of a sub workflow. We suspend the workflow (unless it’s already suspended). In other words, we should be leaving the workflow in a known state, correctly persisted to the database.

Peek ahead to line 37. We unconditionally commit the database transaction. We got the execution to a known state. That’s what we want, so commit the transaction.

Start() processing does not do a rollback. We could have executed any number of nodes prior to hitting the exception. Any of those steps (such as Laravel’s creating a new user) might be non-reversible. We therefore “latch” the current state, whatever it is, and commit.

More on transactions in a moment.

Lines 17-30, the try block, run the workflow.

doStart()

Line 17 creates a database record for this workflow execution, and sets the database record id to $this->id. See zetacomponents/workflow-database-tiein/src/execution.php:

    protected function doStart( $parentId )
    {

        $query = $this->db->createInsertQuery();

        $query->insertInto( $this->db->quoteIdentifier( $this->options['prefix'] . 'execution' ) )
              ->set( $this->db->quoteIdentifier( 'workflow_id' ), $query->bindValue( (int)$this->workflow->id ) )
              ->set( $this->db->quoteIdentifier( 'execution_parent' ), $query->bindValue( (int)$parentId ? (int)$parentId : NULL ) )
              ->set( $this->db->quoteIdentifier( 'execution_started' ), $query->bindValue( time() ) )
              ->set( $this->db->quoteIdentifier( 'execution_variables' ), $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->variables ) ) )
              ->set( $this->db->quoteIdentifier( 'execution_waiting_for' ), $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->waitingFor ) ) )
              ->set( $this->db->quoteIdentifier( 'execution_threads' ), $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->threads ) ) )
              ->set( $this->db->quoteIdentifier( 'execution_next_thread_id' ), $query->bindValue( (int)$this->nextThreadId ) );

        $statement = $query->prepare();
        $statement->execute();

        $this->id = (int)$this->db->lastInsertId( 'execution_execution_id_seq' );
    }

loadFromVariableHandlers()

Return to start(), line 18, loadFromVariableHandlers(). The Variable Handler documentation is here: http://ezcomponents.org/docs/api/latest/Workflow/ezcWorkflowVariableHandler.html. The easiest way to understand how the Variable Handlers work is just look at what start() does:

    /**
     * Loads data from variable handlers and
     * merge it with the current execution data.
     */
    protected function loadFromVariableHandlers()
    {
        foreach ( $this->workflow->getVariableHandlers() as $variableName => $className )
        {
            $object = new $className;
            $this->setVariable( $variableName, $object->load( $this, $variableName ) );
        }
    }

So, the Variable Handlers are a way to pre-populate the execution variables before running the workflow. We could use this, for example, for loading the current user state into the workflow.

saveToVariableHandlers()

There is a corresponding Save to Variable Handlers:

    /**
     * Saves data to execution data handlers.
     */
    protected function saveToVariableHandlers()
    {
        foreach ( $this->workflow->getVariableHandlers() as $variableName => $className )
        {
            if ( isset( $this->variables[$variableName] ) )
            {
                $object = new $className;
                $object->save( $this, $variableName, $this->variables[$variableName] );
            }
        }
    }

suspend()

However, if we look at start(), we don’t see any saveToVariableHandlers() call. That call happens when we suspend() the workflow or end() the workflow:

    /**
     * Suspends workflow execution.
     *
     * This method is usually called by the execution environment when there are no more
     * more activated nodes that can be executed. This is commonly the case with input
     * nodes waiting for input.
     *
     * This method calls doSuspend() before calling saveToVariableHandlers() allowing
     * reimplementations to save variable and node information.
     *
     * @ignore
     */
    public function suspend()
    {
        $this->cancelled = false;
        $this->ended     = false;
        $this->resumed   = false;
        $this->suspended = true;

        $keys     = array_keys( $this->variables );
        $count    = count( $keys );
        $handlers = $this->workflow->getVariableHandlers();

        for ( $i = 0; $i < $count; $i++ )
        {
            if ( isset( $handlers[$keys[$i]] ) )
            {
                unset( $this->variables[$keys[$i]] );
            }
        }

        $this->doSuspend();
        $this->saveToVariableHandlers();

        foreach ( $this->plugins as $plugin )
        {
            $plugin->afterExecutionSuspended( $this );
        }
    }

While we’re here, take a close look at suspend(). We’ll tackle end() next, and then get back to start().

Lines 15-18 set our execution-state flags.

Lines 20-22 collect the list of variable names, count the number of variables so as to control our loop, and obtain the list of variable handlers.

Lines 24-30 unset any variables which are being managed by a variable handler. This is an important point to note! If we set a variable handler, meaning that we have provided our own code to manage that variable, that variable becomes invisible outside the workflow. In the various examples, we have been using getVariable(‘slide’) to check which slide is set. If ‘slide’ were registered with a variable handler, getVariable(‘slide’) would return nothing useful. (The wrapper returns null for this case; Zeta Workflow throws an exception.)

Line 32, doSuspend(), saves the execution state to the tied database.

Line 33 invokes our variable handlers. This could be where we use Laravel models to save our user state.

Lines 35-38 pass the execution to each plugin (if any) for afterExecutionSuspended() processing.

end()

Let’s look at end().

    /**
     * Ends workflow execution with the node $endNode.
     *
     * End nodes must call this method to end the execution.
     *
     * @param ezcWorkflowNode $node
     * @ignore
     */
    public function end( ezcWorkflowNode $node = null )
    {
        if ( !$this->cancelled )
        {
            if ( $node !== null )
            {
                foreach ( $this->plugins as $plugin )
                {
                    $plugin->afterNodeExecuted( $this, $node );
                }
            }

            $this->ended     = true;
            $this->resumed   = false;
            $this->suspended = false;

            $this->doEnd();
            $this->saveToVariableHandlers();

            if ( $node !== null )
            {
                $this->endThread( $node->getThreadId() );

                foreach ( $this->plugins as $plugin )
                {
                    $plugin->afterExecutionEnded( $this );
                }
            }
        }
        else
        {
            foreach ( $this->plugins as $plugin )
            {
                $plugin->afterExecutionCancelled( $this );
            }
        }
    }

The End Node

The context is different here. We’re running inside execute(). (Go back to line 30 of the start() listing, above, to orient yourself.) An individual node, specifically an End Node, is calling end(). See zetacomponents/workflow/src/nodes/end.php:

class ezcWorkflowNodeEnd extends ezcWorkflowNode
{

    /**
     * Ends the execution of this workflow.
     *
     * @param ezcWorkflowExecution $execution
     * @return boolean true when the node finished execution,
     *                 and false otherwise
     * @ignore
     */
    public function execute( ezcWorkflowExecution $execution )
    {
        $execution->end( $this );

        return parent::execute( $execution );
    }
}

For the record, end() is also called during cancel() processing.

How Variable Handlers Work

We seem to have taken ourselves pretty far down the rabbit trail. Remember, our purpose is to understand how the variable handlers work.

See line 26. When we hit the end node, we save our state with the variable handlers.

However, see line 11. If we are doing cancel() processing, we do NOT save state with the variable handlers. When we hit cancel(), we throw everything away. If you want to verify this, cancel() is in the code directly above end(). cancel() does call end(), and does NOT itself call the variable handlers.

So, the moral of the story is that the variable handlers do their load() prior to starting execution, and do their save() when suspend() or end() but not cancel().

Plugins

If you’ve been carefully watching the code as we walk through Zeta Workflow, you’ve seen a number of places where we run the list of plugins. Just be aware that we might want to use this feature as our way of communicating with Laravel!

Transaction Design

You will have noted that the entire workflow runs inside a database transaction. This might not be a good thing, because the individual steps might themselves use database transactions.

This is where things get tricky. However, the tricky part becomes simple! Here’s the deal.

We have a “Laravel” connection to the database, and we have a “Zeta Workflow tied database” connection to the (presumably same) database. The way transactions are supposed to work, is that the one connection can’t see anything of the other connection until the transaction commits.

MySQL (InnoDB) normally runs in autocommit mode, which means every database statement result becomes visible to the other connection upon completion.

I believe (but have not tested) that this comes down to a simple separation. The Wrapper/Workflow part runs in a single transaction on one connection, and the Laravel code runs however it cares to on the other connection. The one thing the Laravel code can’t do is peek at the database information for a running execution.

Looking the other direction, the Laravel code runs either:

  • Outside the workflow execution,
  • Inside a plugin or listener, and is therefore self-contained with respect to the workflow, or
  • Inside a service object, which is again self-contained with respect to the workflow

So, I believe (but have not tested), the transaction mechanism should work fine.

Here is the sqlite and MySQL documentation:

Resume

Resume processing has two parts:

  1. Process input variables
  2. Execute the workflow

If the input variables do not pass validation, or anything else goes wrong during input processing, we roll back and throw an exception.

Assuming we got past input validation, we execute the workflow. If we see an exception at this point, we handle things a bit differently:

  1. Set the variable ‘exception’ to the exception message.
  2. Suspend the workflow.
  3. Re-throw the exception.

We already know the execution id, so it doesn’t matter if resume throws an exception at this point.

On the other hand, if everything went well (no exceptions), we return the execution id if suspended, or null otherwise.

Resume Walk Through

Here is the actual resume() code in zetacomponents/workflow/src/interfaces/execution.php.

    public function resume( array $inputData = array() )
    {
        if ( $this->id === null )
        {
            throw new ezcWorkflowExecutionException(
              'No execution id given.'
            );
        }

        $this->cancelled = false;
        $this->ended     = false;
        $this->resumed   = true;
        $this->suspended = false;

        $this->beginTransaction();
        try {
            $this->doResume();
            $this->loadFromVariableHandlers();

            $errors = array();

            foreach ( $inputData as $variableName => $value )
            {
                if ( isset( $this->waitingFor[$variableName] ) )
                {
                    if ( $this->waitingFor[$variableName]['condition']->evaluate( $value ) )
                    {
                        $this->setVariable( $variableName, $value );
                        unset( $this->waitingFor[$variableName] );
                    }
                    else
                    {
                        $errors[$variableName] = (string)$this->waitingFor[$variableName]['condition'];
                    }
                }
            }
        } catch (\Exception $e) {
            $errors['caught exception'] = $e->getMessage();
        }

        if ( !empty( $errors ) )
        {
            $this->rollback();
            throw new ezcWorkflowInvalidInputException( $errors );
        }

        try {
            foreach ( $this->plugins as $plugin )
            {
                $plugin->afterExecutionResumed( $this );
            }

            $this->execute();
        } catch (\Exception $e) {
            $this->setVariable('exception', $e->getMessage());
            if(!($this->isSuspended())) {
                $this->suspend();
            }
            $this->commit();
            throw $e;
        }
        $this->commit();

        // Return execution ID if the workflow has been suspended.
        if ( $this->isSuspended() )
        {
            return $this->id;
        }
    }

Line 1 shows that we can pass input data to resume(). As we will see with line 24, all input data are ignored unless the workflow is specifically waiting for that variable by name.

Lines 3-8 are the guard clause ensuring we have an execution id.

Lines 10-13 change the execution state from suspended to resumed.

Line 15 begins the database transaction (assuming we are using the tied database, which we are). This means the rest of this method must consist of try/catch blocks guaranteeing that we close out the transaction with either rollback or commit.

Lines 16-39, our first try/catch block, process the input parameters supplied with the resume() call.

Line 17, doResume(), is now an empty method that does nothing.

Line 18, loadFromVariableHandlers(), pre-populates the execution variables prior to processing the input which might alter those variables.

Lines 20-36 process the input variables. Line 24 ignores the variable unless the execution (i.e., an Input Node) is awaiting that specific variable. Line 26 validates the input value per the Input Node’s criteria. If valid, lines 28-29 set the variable and clear the “waiting for this variable” flag. If the variable did not pass validation, line 33 flags the error and line 44 will throw an Invalid Input Exception.

If for any reason an exception gets thrown in the input processing, the caught exception’s message is added to the list of errors at line 38.

Lines 41-45 handle invalid input. We roll back the transaction and throw an exception. In short, if there was an input validation error, resume() resets to as if resume() was never called at all.

Lines 47-61 execute the workflow. Lines 48-51 notify plugins that execution is resuming. Line 53 then runs the workflow.

Line 54 catches any exception thrown by any plugin or executing node. Line 55 sets the execution variable ‘exception’ to the exception message. Lines 56-58 suspend the execution, getting us to a known state (unless already suspended). Line 59 commits the transaction, nailing down that known state. We then re-throw the exception.

This means that resume() should be run inside a try/catch. It can roll back and throw an invalid input exception, or it can commit and throw an execution exception.

If things run normally (no exceptions), line 62 commits the database transaction. Lines 65-68 return the execution id if suspended, or null otherwise.

Exception Unit Test

Since workflow exception behavior is similar to failure behavior, we can quickly design our unit test. The test is tests/rollback/slideExceptionTest.php.

 * Verify workflow behavior upon node exception.
 *
 * If workflow start were to throw an exception back to us, we would not have
 * the execution id. So, we need to suspend and return the id.
 *
 * The workflow:
 * * Start
 * * Slide 1
 * * Exception 1
 * * Slide 2
 * * End
 *
 * The test hook in SlideProvider is conditioned such that the first two times
 * through it throws an exception, and the following times through it returns
 * 'true' allowing the workflow to proceed to the next node.
 *
 * Test expectations:
 * * 01. start() returns execution id
 * * 02. start() sets variable 'execution'
 * * 03. start() sets variable 'execution' with exception message /Exception/
 * * 04. start() sets slide to exception1
 * * 05. One resume() throws SlideProviderException
 * * 06. One resume() sets slide to exception1
 * * 07. Two resumes (try/catch first one) sets slide to slide2
 * * 08. Two resumes returns resume id === null

To get started, we set up the workflow. This code is nearly identical to the Fails test we already studied.

class slideExceptionTest extends \StarTribune\Workflow\Tests\DbTestCase {
    public $wrapper;
    public $slides;
    public $main = 'main';
    public function setUp() {
        parent::setUp();
        if(preg_match('/memory/', $this->dbenvironment)) {
            $this->markTestSkipped("Not propagating migration into tiein connection");
        }
        $this->wrapper = \App::make('StarTribune\Workflow\Wrapper\ZetaComponentsInterface');
        $this->registerSlides();
        $this->buildWorkflow();
    }
    public function registerSlides() {
        $this->slides = array(
            'slide1'     => 'Slide_1',
            'slide2'     => 'Slide_2',
            'fail1'      => 'Fail_1',
            'exception1' => 'Exception_1',
            );
        $this->wrapper->createSlides($this->slides);
    }
    public function buildWorkflow() {
        $this->wrapper->createWorkflow($this->main);
        $this->wrapper->addToStart('slide1');
        $this->wrapper->addSlideOutNode('slide1', 'exception1');
        $this->wrapper->addSlideOutNode('exception1',  'slide2');
        $this->wrapper->endHere('slide2');
        $this->wrapper->saveWorkflow();
    }
    public function verifySlide($slideName, $msg = '') {
        $actual = $this->wrapper->getVariable('slide');
        $expected = $this->slides[$slideName];
        $this->assertEquals($expected, $actual, $msg);
    }
    public function startWorkflow() {
        $this->wrapper->loadByName($this->main);
        return $this->wrapper->start();
    }

01. Verify workflow start does not throw exception and returns execution id

    public function test01Start() {
        $startid = $this->startWorkflow();
        $this->assertTrue($startid > 0);
    }

For the above test to work, we needed to change start() so that it caught the exception and returned the execution id. We wrote the test first, watched it fail, changed the start() code, and watched the test pass.

We took a test-first approach with resume() as well. Prior to my changes, throwing an exception in resume() meant an sqlite failure due to the locked database table.

Let’s continue with the tests validating our expected exception behavior.

02. Verify workflow start sets variable ‘exception’.

    public function test02StartVariable() {
        $startid = $this->startWorkflow();
        $exception = $this->wrapper->getVariable('exception');
        $this->assertTrue(strlen($exception) > 0);
    }

03. start() sets variable ‘execution’ with exception message /Exception/

    public function test03() {
        $startid = $this->startWorkflow();
        $exception = $this->wrapper->getVariable('exception');
        $this->assertRegExp('/Exception/', $exception);
    }

04. start() sets slide to exception1. Since we throw the exception after setting the slide variable, we expect to see that the slide variable was in fact persisted.

    public function test04() {
        $startid = $this->startWorkflow();
        $this->verifySlide('exception1');
    }

05. One resume() throws SlideProviderException

    /**
     * 05. One resume() throws SlideProviderException
     * @expectedException \StarTribune\Workflow\ServiceObject\SlideProviderException
     * @return void
     */
    public function test05() {
        $startid = $this->startWorkflow();
        $resumeid1 = $this->wrapper->resume(array(),$startid);
    }

06. One resume() sets slide to exception1

    public function test06() {
        $startid = $this->startWorkflow();
        try {
            $resumeid1 = $this->wrapper->resume(array(),$startid);
        } catch(\StarTribune\Workflow\ServiceObject\SlideProviderException $e) {
        }
        $this->verifySlide('exception1');
    }

07. Two resumes (try/catch first one) sets slide to slide2

    public function test07() {
        $startid = $this->startWorkflow();
        try {
            $resumeid1 = $this->wrapper->resume(array(),$startid);
        } catch(\StarTribune\Workflow\ServiceObject\SlideProviderException $e) {
        }
        $resumeid2 = $this->wrapper->resume(array(),$startid);
        $this->verifySlide('slide2');
    }

08. Two resumes returns resume id === null. This indicates that we reached the End Node as expected.

    public function test08() {
        $startid = $this->startWorkflow();
        try {
            $resumeid1 = $this->wrapper->resume(array(),$startid);
        } catch(\StarTribune\Workflow\ServiceObject\SlideProviderException $e) {
        }
        $resumeid2 = $this->wrapper->resume(array(),$startid);
        $this->assertTrue(null === $resumeid2);
    }

Now that we understand how we can interact with the workflow we’ll look at the Zeta Components Wrapper and its Interface. First up is the Interface: http://otscripts.com/zeta-workflow-05-zeta-components-wrapper/.