Optimizing Operations with Batched and Throttled Execution for Experienced Database Administrators

Author: Yuri Gee

Date: 25 Sep 2025

10 minute read time

This article showcases a functional implementation of a scheduler designed to execute and repeat a series of database commands in batch mode. Each command runs with configurable timeouts between batches and between individual commands, enabling throttling and controlled operations. The scheduler also supports resuming from the last executed batch, aligning with predefined time slots for subsequent runs.

The demonstration leverages the SugarCRM database programming interface, utilizing a custom package and supporting module created via Module Builder. Administrators may need to refine and consider using this solution either on a permanent basis or as needed, with access strictly limited to users with administrator privileges.

Scheduler operational list view

This illustrative tool offers robust capabilities for advanced operations when properly configured. It can be valuable in self-managed environments and can also support SLA-driven requirements. However, since it is provided as-is, it should be thoroughly tested and used with caution. Its effectiveness relies heavily on the accuracy of the SQL queries involved, which should be crafted by experienced database administrators.

Such queries must be designed to complete reliably, even when executed multiple times within defined limits. Because they can bypass higher-level APIs, it is essential that they populate all necessary fields to ensure changes can be properly tracked and acted on—both within the database and the module housing the queries.

A Few Practical Use Cases

When combined with a smart scheduler, SQL becomes a highly effective mechanism capable of managing a broad spectrum of tasks. Here are several practical examples of its potential applications:

  • Restoring deleted records and their associated relationships.
  • Recalculating values for specific formulas or computed fields.
  • Removing user session states or server-side caches.
  • Purging search queues tied to hard-deleted records using advanced join operations.
  • Executing advanced storage cleanup queries that can be throttled during off-peak hours.
  • Periodically inserting measurement data points to monitor and manage storage usage, based on custom query logic (as demonstrated in related articles).

Queries can even be designed to dynamically generate additional entries within the Scheduler Queries module itself, enabling real-time evaluation and processing of more complex conditions—for example retrieving targeted record subsets from advanced joins for further operations.

Scheduler code

The scheduler is built based on the example provided in the developer guide and is composed of the following files.

Main scheduler code

custom/Extension/modules/Schedulers/Ext/ScheduledTasks/job_runcommands.php

<?php
//custom/Extension/modules/Schedulers/Ext/ScheduledTasks/job_runcommands.php
const RUNCMD_DEFAULT_SLEEP = 1;
const RUNCMD_MAX_SLEEP = 3600;
const RUNCMD_DEFAULT_LIMIT = 10000;
const RUNCMD_MAX_LIMIT = 10000;
const RUNCMD_DEFAULT_ROW_MAX = 1;
const RUNCMD_MAX_ROW_MAX = 10000000;
const RUNCMD_DEFAULT_LOOP_MAX = 1;
const RUNCMD_MAX_LOOP_MAX = 1000000;
const RUNCMD_MAX_COMMAND_LENGTH = 1000;
const RUNCMD_MAX_LOGS = 1000;
const RUNCMD_TABLE_NAME = 'dq_dbqueries';
const RUNCMD_LOCK_NAME = 'job_runcommands_lock';

array_push($job_strings, 'job_runcommands');
function job_runcommands() {

    $lockName = RUNCMD_LOCK_NAME;
    $lockAcquired = false;
    $logCount = 0;

    try {
        // Acquire named lock (wait up to 1 second)
        safeLogFatal("Starting runcommands scheduler...");
        $lockQuery = "SELECT GET_LOCK('{$lockName}', 1) AS lock_status";
        $lockResult = $GLOBALS['db']->fetchByAssoc($GLOBALS['db']->query($lockQuery))['lock_status'];   

        if ($lockResult != 1) {
            safeLogFatal("Could not acquire lock '{$lockName}'. Another instance may be running.");
            return true;
        }
        $lockAcquired = true;
       
        while (checkSchedulerStatus()) {
            // Fetch the next unhandled command in order
            $nextCommandQuery = "SELECT id, num, command, wait_this, wait_next, row_count, loop_count, cap, row_max, loop_max FROM " . RUNCMD_TABLE_NAME . " WHERE num > 1 AND handled = 0 ORDER BY num ASC LIMIT 1";
            $commandRow = $GLOBALS['db']->fetchByAssoc($GLOBALS['db']->query($nextCommandQuery));

            if (!$commandRow) { 
                safeLogFatal("No unhandled command found. Exiting."); 
                return true; 
            }
        
            $id = $commandRow['id'];
            $command = (string)$commandRow['command'];

            //TO_NOTE: If the job times out, it resumes using the last saved row and loop counts. 
            //However, some discrepancies may arise due to batch query timing, as execution and count updates are not fully synchronized within a single transaction.
            $rowCount = max(0, (int)($commandRow['row_count'] ?? 0));
            $loopCount = max(0, (int)($commandRow['loop_count'] ?? 0));

            $waitThis = validateInt($commandRow['wait_this'], RUNCMD_DEFAULT_SLEEP, 1, RUNCMD_MAX_SLEEP);
            $waitNext = validateInt($commandRow['wait_next'], RUNCMD_DEFAULT_SLEEP, 1, RUNCMD_MAX_SLEEP);
            $cap = validateInt($commandRow['cap'], RUNCMD_DEFAULT_LIMIT, 1, RUNCMD_MAX_LIMIT);
            $rowMax = validateInt($commandRow['row_max'], RUNCMD_DEFAULT_ROW_MAX, 1, RUNCMD_MAX_ROW_MAX, true);
            $loopMax = validateInt($commandRow['loop_max'], RUNCMD_DEFAULT_LOOP_MAX, 1, RUNCMD_MAX_LOOP_MAX, true);

            // TO_NOTE: Some commands may not include a single 'limit' clause or may place it elsewhere. Ensure 'LIMIT cap' is appended if missing.
            if (!preg_match('/\blimit\b/i', $command)) $command .= " limit {$cap}";
            $limit = preg_match('/\blimit\s+(\d+)/i', $command, $matches) ? (int)$matches[1] : 1;

            if (!validateCommand($command) || $limit === 0) {
                safeLogFatal("Command ID {$id} rejected due to unsafe syntax or keywords.",$logCount);
                $GLOBALS['db']->query("UPDATE " . RUNCMD_TABLE_NAME . " SET handled = 1, is_error = 1 WHERE id = '{$id}'");
                continue;
            }

            // TO_NOTE: 'limit' serves only as an estimate; the actual number of affected records may vary, especially for inserts
            // For each command, adjust rowMax and loopMax as needed, and ensure SQL statements end naturally to prevent infinite loops
            if ($rowMax <= 0) $rowMax = min(max($rowMax, $limit), RUNCMD_MAX_ROW_MAX);
            if ($loopMax <= 0) $loopMax = min(ceil($rowMax / $limit), RUNCMD_MAX_LOOP_MAX);

            safeLogFatal("Processing {$id}: rows {$rowCount}/{$rowMax}, loops {$loopCount}/{$loopMax}, times//log {$waitThis}/{$waitNext}/{$logCount} command: {$command}", $logCount);

            // Repeat execution until no rows are returned or max rows or loops exceeded
            while (true) {

                if (!checkSchedulerStatus()) return true;

                try {
                    $result = $GLOBALS['db']->query($command);

                    $error = $GLOBALS['db']->lastError();
                    if (!empty($error) && strpos($error, (string)$command) !== false) throw new Exception("SQL Error detected: {$error}");

                    //TO_NOTE: loop max could potentially be adjusted dynamically by comparing $processedRows against the initial estimate from $limit.
                    $processedRows = $GLOBALS['db']->getAffectedRowCount($result);
                    //safeLogFatal("Processed rows this cycle: {$processedRows}",$logCount);

                    // Update rowCount and loopCount
                    $rowCount = min(max($rowCount + $processedRows, 0), 2147483647);
                    $loopCount++;
                    $GLOBALS['db']->query("UPDATE " . RUNCMD_TABLE_NAME . " SET row_count = {$rowCount}, is_error = 0, loop_count = {$loopCount} WHERE id = '{$id}'");

                    if ($processedRows <= 0 || $rowCount >= $rowMax || $loopCount >= $loopMax) {
                        // Mark as handled and exit loop
                        $GLOBALS['db']->query("UPDATE " . RUNCMD_TABLE_NAME . " SET handled = 1 WHERE id = '{$id}'");
                        break;
                    }

                    //TO_NOTE: timout mechanism can be improved based on query expected run time
                    sleep($waitThis);
                }
                catch (Exception $e) {
                    safeLogFatal("Error executing command ID {$id}: " . $e->getMessage(),$logCount);
                    $GLOBALS['db']->query("UPDATE " . RUNCMD_TABLE_NAME . " SET handled = 1, is_error = 1 WHERE id = '{$id}'");
                    break;
                }
            }
            
            //TO_NOTE: timout mechanism can be improved based on query expected run time
            sleep($waitNext);
        }

        return true;
    }  
    catch (Exception $e) {
        safeLogFatal("Fatal error in scheduler: " . $e->getMessage());
        return false;
    } 
    finally {
        if ($lockAcquired) {
            try {
                $releaseQuery = "SELECT RELEASE_LOCK('{$lockName}')";
                $GLOBALS['db']->query($releaseQuery);
                safeLogFatal("Lock '{$lockName}' released.");
            } catch (Exception $e) {
                safeLogFatal("Failed to release lock '{$lockName}': " . $e->getMessage());
            }
        }
    }
}

function checkSchedulerStatus() {
    $checkQuery = "SELECT id FROM " . RUNCMD_TABLE_NAME . " WHERE num = 1 AND handled = 0 LIMIT 1";
    $check_result = $GLOBALS['db']->query($checkQuery);

    if ($GLOBALS['db']->getRowCount($check_result) == 0) {
        safeLogFatal("Scheduler is instructed to stop (num=1 handled status).");
        return false;
    }

    return true;
}

function validateInt($value, int $default, int $min, int $max, bool $allowZero = false): int 
{
    $value = (int)($value ?? $default);
    if ($allowZero && $value === 0) return 0;
    return ($value >= $min && $value <= $max) ? $value : $default;
}

function validateCommand($cmd) {
    $cmd = preg_replace('/\s+/', ' ', trim($cmd));
    if ($cmd === '' || strpos($cmd, ';') !== false || strpos($cmd, '--') !== false ||
      strpos($cmd, '/*') !== false || strpos($cmd, '*/') !== false || strlen($cmd) > RUNCMD_MAX_COMMAND_LENGTH) return false;

    $disallowedStart = ['select', 'show', 'describe', 'explain', 'with'];
    $allowedStart = ['update', 'delete', 'insert'];

    $firstWord = strtolower(strtok($cmd, " \t\n\r\0\x0B"));

    if (in_array($firstWord, $disallowedStart)) return false;
    if (!in_array($firstWord, $allowedStart)) return false;

    $cmdStripped = preg_replace(["/'[^']*'/", '/\"[^\"]*\"/', '/`[^`]*`/'], '', $cmd);
    $forbiddenKeywords = [
        'drop', 'truncate', 'alter', 'shutdown', 'grant', 'revoke',
        'exec', 'execute', 'call', 'load_file', 'outfile', 'dumpfile',
        'union', 'create', 'replace', 'handler', 'open', 'prepare',
        'deallocate', 'kill', 'lock', 'unlock', 'rename', 'flush'
    ];
    foreach ($forbiddenKeywords as $keyword) {
        if (preg_match("/\\b{$keyword}\\b/i", $cmdStripped)) {
            return false;
        }
    }
    return true;
}

function safeLogFatal($msg, &$logCount = null, $maxLength = 500, $maxLogs = RUNCMD_MAX_LOGS) 
{    
    $count = 0;
    if (isset($logCount)) $count = &$logCount; 

    if ($count++ >= $maxLogs) {
        if ($count === $maxLogs + 1) $GLOBALS['log']->fatal("RunCommands: Too many logs. Further messages suppressed.");
    }
    else
    {
        $msg = is_string($msg) ? $msg : "Message should be a string";
        if ($maxLength && strlen($msg) > $maxLength) $msg = substr($msg, 0, $maxLength) . '... [truncated]';

        $GLOBALS['log']->fatal("RunCommands: " . $msg);
    }
}

Language file(s)

custom/Extension/modules/Schedulers/Ext/Language/en_us.job_runcommands.php

<?php
//custom/Extension/modules/Schedulers/Ext/Language/en_us.job_runcommands.php

$mod_strings['LBL_JOB_RUNCOMMANDS'] = 'Job Run SQL Commands';

Example manifest file example, if distributed in a package

<?php

$manifest = array(
    'acceptable_sugar_flavors' => array('PRO','ENT','ULT'),
    'acceptable_sugar_versions' => array(
        'exact_matches' => array(),
        'regex_matches' => array('(.*?)\\.(.*?)\\.(.*?)$'),
    ),
    'author' => 'SugarCRM demo',
    'description' => 'Copies scheduler for batch processing DB commands',
    'icon' => '',
    'is_uninstallable' => true,
    'name' => 'Example Job Run SQL Commands',
    'published_date' => '2025-09-22 00:00:00',
    'type' => 'module',
    'version' => '1.1.0',
);

$installdefs = array(
    'id' => 'package_1530308899',
    'copy' => array(
        array(
            'from' => '<basepath>/Files/custom/Extension/modules/Schedulers/Ext/ScheduledTasks/job_runcommands.php',
            'to' => 'custom/Extension/modules/Schedulers/Ext/ScheduledTasks/job_runcommands.php',
        ),
        array(
            'from' => '<basepath>/Files/custom/Extension/modules/Schedulers/Ext/Language/en_us.job_runcommands.php',
            'to' => 'custom/Extension/modules/Schedulers/Ext/Language/en_us.job_runcommands.php',
        ),
    ),
);

Scheduler configuration

To support the scheduler's operation, we need a custom module named DBQueries (dq_dbqueries table). This module should include all required fields with appropriate data types and default values as illustrated.

Checkbox fields (handled and is_error) should default to unchecked, and Integer fields should default to 0, with the following exceptions:

cap: default value set to 10000

wait_this: default value set to 1

wait_next: default value set to 1

Additionally:

The num field should be configured to auto-increment.

The command field should be defined as a TextArea to accommodate larger query content.

DBQueries module as designed in Module Builder

Appropriate measures should be implemented to ensure that the module and its records are accessible exclusively to administrator users. It is also advisable to display the relevant field values within both the module’s Record View and List View for clarity and ease of management. The layout will appear as follows.

The scheduler operational View

A crucial setup step involves creating the initial record with num = 1, which serves as the management anchor for the scheduler. This record—referred to as "core" in the screenshot—must have its Handled checkbox unchecked to allow the scheduler to begin executing commands. If automation is required, this checkbox can be managed through an administrative BPM process or other automated mechanisms.

Next, you’ll need to configure a scheduler that activates the job titled Job Run SQL Commands, and assign an appropriate schedule to it. For tighter control over execution timing, it’s recommended to disable the Execute If Missed checkbox.

As the scheduler runs, Row Count and Loop Count will update in real time, reflecting the progress of command execution. Logging is handled via the Sugar Log, with output volume determined by the scheduler code constant—up to a maximum of 1000 log entries per run (configurable). Successfully processed commands will be marked as Handled, while failed ones will be flagged as Error.

To prevent runaway execution or excessive record processing, Row Max and Loop Max act as safeguards. However, these are basic precautions and may be exceeded if the scheduler crashes or terminates before updating the counters for the batch being processed. The reliability of this mechanism also depends heavily on the quality of the SQL queries—particularly their ability to execute in short batches and eventually complete without looping indefinitely. For this reason, queries should be carefully tested and formulated by experienced database administrators to meet all SLA to ensure they meet all SLA requirements specific to your environment.

If a query lacks an explicit batching limit, a Default Limit (Cap) of 10,000 will be applied by appending LIMIT 10000 to the query. If a LIMIT clause is already present, its first occurrence in the query will be used as the estimated limit. Note that this estimate may not reflect the actual number of affected records by the query, but it helps define Row Max and Loop Max when those values are left undefined (0) by the administrator.

By default:

If Row Max is 0, it will be set to the query’s limit (or the default cap if no limit is defined).

If Loop Max is set to 0, it will automatically default to Row Max / Limit, or to 1 if Row Max is not defined— ensuring the query runs at least once and attempting to align the with the Row Max value based initial estimated limit. The scheduler code can be further enhanced to support dynamic adjustments based on the actual number of records affected by each query batch and the intended Row Max target.

Here’s how typical processing appears in the Sugar logs: it displays the number of rows processed vs max, loop counts vs max, timeout intervals, and log entry totals—alongside the actual SQL command being executed.

Scheduler operations log in Sugar Log

Additional Notes and Considerations

The current implementation uses a custom query sanitation process and legacy database API may require updates to incorporate a newer database management API (using prepared statements) or tailored to the specific database version in use. This includes better handling of commands like INSERT, which can significantly increase database size if run incorrectly, and strengthening sanitation logic based on the nature of the tasks being executed.

To improve scheduler reliability and reporting, it may also be beneficial to implement an overall timeout mechanism that allows jobs to complete automatically within a predefined time window. This ensures that jobs with defined start and end limits—aligned with the OS scheduler’s process time constraints—are marked as successful upon completion, even if they exceed the timeout threshold, rather than being incorrectly flagged as failed due to a timeout. It also contributes to more accurate tracking of row counts and loop counts in subsequent runs.

The effectiveness of the scheduler largely depends on how well SQL queries are structured for batching. Queries should be optimized to execute within short time frames per batch. This may require adding indexes, refining join conditions, and avoiding performance bottlenecks caused by ordering or non-indexed columns. Poorly constructed queries can easily result in timeouts.

When a batch executes successfully, you can pause further processing by setting the Held flag on the core record (num = 1). This allows you to resume execution later as needed.

The scheduler job currently runs in a single-threaded mode using a database lock to prevent concurrent execution, simplifying the code. However, multi-threading could be explored as a future enhancement.

The following constraints apply to scheduler operation:

command size is capped at 1000 characters,

max batch number (Loop Max) is limited to 1 million, max affected rows (Row Max) per command is 10 million and the default query limit (Cap) is set to 10,000.  

Log messages per run are restricted to 1000 entries. Sleep intervals (Wait This, Wait Next) —both between batches and between commands—can range from 1 second to 3600 seconds. All these parameters are configurable via scheduler constants in the code, including the custom module table name used by the scheduler.

Your feedback and suggestions are welcome.