2013-11-03

PHP parallel job scheduling - 3

In the last two posts  on parallel processing of workflows ,  PHP parallel job scheduling part 1  and part 2  I have described how to parallel process workflow steps to gain wall clock time. But if an individual step takes to long time the step must be cut up into smaller pieces and processed in parallel. Here I describe how I cut up a too big job into right sized pieces and parallel process those pieces together in my PHP job scheduler. I call workflows schedule  and workflow steps job , both schedules and jobs are defined in xml  scripts .  In this post I introduce iterators which  allows to express complex (programming) logic within xml. XML is a markup language that defines a set of rules for encoding documents, it is not a Turing-complete  language per se and it is very hard to express and interpret normal programing logic in xml if you do 'standard' xml parsing. (I use xml because it's flexible and there are an abundance of xml parsers available.) One use of the job iterators is to cut up a job into smaller pieces to save time when the job takes too long time to run.
What is too long time? When should we cut a job into smaller pieces? ETL processes that takes longer time than the time period of source data, e.g. if it takes 25 hours to load 24 hours of source data, that it is definitely too long time. But when you hear ‘it is not reasonable to load warehouse transactions more that twice a day, the load time is too long’  and you want those transactions loaded three times a day, you got a case of too long time. Another case of too long time is, when the source system has a shorter time limit of transactions than the extraction time, e.g. it takes 25 minutes to extract parts data from a source system but there is time limit on the extraction transaction of 20 minutes.      
In my job scheduler an iterator is a PHP array construct used for iterating a schedule or a job (or a template), the iterator is declared with a <forevery>  xml tag. Let’s look at some simple examples:
The job A will run twice since the iterator contains 2 rows, we will execute the SQL statements twice. This is absolutely meaningless but consider this schedule:
The <forevery> iterator contains two rows with one column ‘TABLE’.
The @TABLE carries the TABLE value from the iterator to the sql script. Now we see some functionality in the iterator but it’s still pretty lame, but the next one starts to be useful:
Now we can get a report for all the tables in the test database, it’s a clunky example but it illustrates one function of the <forevery> iterator. The <forevery> iterator contains the result table of the sql query SELECT ‘TABLE_NAME’ …, then we run the job sql query - select ‘@TABLE’... once for each row in the <forevery> iterator. When we have traversed the iterator we will have the report with table-name and rows for all tables in the test database.
So far in this post all examples are sequential, suppose the job sql query in the example above where a long running query then it would nice if we could parallel execute these queries like:
And this is what this example does. This is a very advanced example which is worth study in detail, this parallel example follows the map and reduce pattern . First we map the work to be done by execute the <forevery>  sql, here we limit the work to four rows which is spread onto two workers. The parallel=’2’  declarative on <forevery>  means release the work  onto two workers and queue up the rest and release them one by one when a worker is free. The result of each ‘row-query’ is converted into a file by the <sqlconverter> , the target file is named ‘report’ + the row number of the iterator + ‘.CSV’.
When the iterator is traversed we have 4 result files:
  • report 0 .CSV
  • report 1 .CSV
  • report 2 .CSV
  • report 3 .CSV
These files are then reduced into the  ‘report.CSV’ file by the <exit pgm=reduceDriver_default.php…/> .
 This is an extract from the log, it’s less morbid than it looks, workers are only computer processes.        
  1. 17660 - a <forevery> iterator with 4 rows is created
  2. 17660 - submits two iterator rows and states all workers busy, waiting for someone to die
  3. 17661 - writes name of home directory name - (1st iterator row starts)
  4. 17662 - writes name of home directory name - (2nd iterator row starts)
  5. 17663 - writes name of home directory name - (3d iterator row starts!)
  6. 17660 - submits the 3d row  and  states all workers busy, waiting for someone to die
  7. 17664 - writes name of home directory name - (4th iterator row starts)
Note since this is parallel execution there is no defined order of the messages; here msg 5 and 6 come in reversed order. If you have read this post so far I really think you should study this example carefully. There are some complex things under the hood. Event driven parallel execution with queue handling expressed in succinct XML parsed and executed  by PHP.
And here is the reduced result, one line from each iterator row.  
But this is not all, there is more to come . What do you do when the setup time is large? e.g. You have 10000 iterator rows, one row takes 30 seconds to process of those seconds 20 is setup time, then you like to create batches of rows and submit those batches for execution and that is what iterator chunking  and   piggyback iterators are for . This I describe in the next post.
In  part 1  I wrote you should look at this code  instead of my code below which shows how I deal with (parallel queues for) job iterators.  
<?php
/* Copyright (C) 2006 Lars Johansson
The awap program product is free software;
you can redistribute it and/or modify it under the terms of the
GNU General Public License as published by the Free Software Foundation;
either version 2 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. */
/*
/ extract of code for parallel execute iterator rows
*/
foreach($driver['rows'] as $driverIndex => $driverRow) { //job iterator
                $driverRow['driverIndex'] = $driverIndex;
                if (array_key_exists('--showdriver',$context['_startup'])) $log->dumpit('Note','driver row', $driverRow);
                if(array_key_exists('wait',$driver) and $driverIndex){
                    $iterSleep = $driver['wait'];
                    $log->logit('Note',"Iterator sleeping $iterSleep seconds.");
                    sleep($driver['wait']);
                }
                $_RESULT = TRUE; // Posit job goes well
                $xlatearray = array_merge($driverRow,$xlateJSC); //array_merge($driverRow,$xlateJSC,$driverRow);
                if ($rowDirectory) {
                        $drvDir = $jobDir.'/drv' . $driverIndex;
                        mkdir($drvDir);
                        copyr($jobDir, $drvDir, 'drv');
                        $jobDir = $drvDir;
                        $job['_directory'] = $drvDir;
                        file_put_contents("$drvDir".'/JOB',serialize($job));
                        $drvDirArray[] = $drvDir;
                        file_put_contents("$jobDir".'/xlatearray',serialize($xlatearray));
/Problem? In scriptEx1 we translate the job array $ok = array_walk_recursive($job, 'xlateTagsInArray', $xlate);
//Here we proceed with the untranslated $job this may cause challenges in at least driverInitTerm()
//We should probably do the translation here
                }
                 if ($fork) $drvPid = pcntl_fork(); // Fork if possible  
                 if($drvPid == -1) { // who am I?
                        $log->logit('Error', " I’m forked up!  pcntl_fork pid=$drvPid. Abending...");
                        $_RESULT = FALSE;
                        return FALSE;
                 } elseif($drvPid) { // I’m the parent process, control max forks
                        if (count($pidar) >= $forkmax) {
                                $opt = 0;
                                $log->logit('Note',"Max workers=$forkmax, waiting for someone to die");
                        }
                        else $opt = WNOHANG;
                        while (($deadPid = pcntl_wait($status, $opt)) > 0) {
                                $deadI = $pidar[$deadPid];
                                $_drvResult = 0; // Posit job ends unsuccessfully.
                                if(pcntl_wifexited($status)) $_drvResult = pcntl_wexitstatus($status);
                                if ($_drvResult) $drvsOK++;
                                else $drvsNotOK++;
                                unset($pidar[$deadPid]);
                                $opt = WNOHANG;
                        }
                 } elseif($drvPid == 0) { // I’m a newly spawn kid or no fork
                        if ($fork) $log->setPid(posix_getpid());
                        $log->logit('Info',"Driver Index=$driverIndex");
//$log->dumpit('Note','XLATE',$xlatearray);
//$log->dumpit('Note','JOB',$job);
                        switch ("$jobType") {
                                case 'ftpinput':
                                case 'sql':
                                        $_RESULT = execSql($jobDir, $xlatearray);
                                break;
                                case 'pdo':
                                        $_RESULT = execPdo($jobDir, $xlatearray);
                                break;
                                case 'script':
                                        $_RESULT = execScript($jobDir, $xlatearray);
                                break;
                                case 'sendmail':
                                             if (array_key_exists('_sqlarray',$job)) $_RESULT = execSql($jobDir, $xlatearray);
                                        if ($_RESULT) $_RESULT = execMail($jobDir, $xlatearray);
                                break;
                                case 'function':
                                    $log->deprecated('Note',"use inherent_function instead!");
                                case 'inherent_function':
                                    $_RESULT = execInherentFunction($jobDir, $xlatearray);
// this must be in sync with scriptEx
                                break;
                                case 'dummy':
                                case '':
                                case NULL:
                                break;
                                default:
                                      $log->logit('Error',"Unknown job type '$jobType' found in schedule $scheduleName");
                                        $_RESULT = FALSE;
                        }
                        if ($_RESULT) $_RESULT = execChildSchedule($context,$schedule,$job,$xlatearray);
                        if ($fork) exit((int) $_RESULT);
                 }  // who am I?
                $pidar[$drvPid] = $driverIndex;
                $log->reset('mode');
                $log->trace('off');
                $jobDir = $jobDirX;
                if (array_key_exists('until',$driver)){
                    $exp = $driver['until'];
                    eval("\$bool = $exp;");
                    if ($bool){
                        $log->logit('Note',"Iterator 'until' condition is true intercepting execution");
                         break;
                    }
                }
                $ok = newFiles($context,$job,$jobDir,$driver,$driverIndex,$jobdirfiles);
        } // job iterator

No comments:

Post a Comment