6 Advanced Features

This section describes optional, advanced features of the SDK which are not covered in the RStudio Launcher Plugin SDK QuickStart Guide.

6.1 Error Handling

The RStudio Launcher Plugin SDK QuickStart Guide creates a new Error with an arbitrary name and code at each location that an error would occur. It is advisable to take a more systematic approach to error handling. The specific implementation is completely at the discretion of the Plugin developer, however this section will discuss one possible organizational strategy.

For each category of error that may occur, create an enum which represents the specific types of errors in that category, and a function (or a few functions) which create Error objects with the correct category name.

Error codes must not begin at 0 as that would be considered a ‘Success’ error code (i.e. not an error). Error codes may be reused across categories.

6.1.1 Example

Suppose that the Orchid Organization’s developer determines that there are only two category of error that cannot be covered by a system or unknown error (available in Error.hpp): Mars API errors, and internal errors.

The developer might create the file MarsError.hpp as follows:


#ifndef ORCHID_MARS_ERROR_HPP_
#define ORCHID_MARS_ERROR_HPP_

namespace rstudio {
namespace launcher_plugins {

class Error;

} // namespace launcher_plugins
} // namespace rstudio


namespace orchid {
namespace mars {

enum class InternalError
{
  SUCCESS = 0,
  UNKNOWN = 1,
  CONVERSION_FAILURE = 2,
  JOB_NOT_FOUND = 3
};

/** Enum which represents a Mars API Error. */
enum class MarsError
{
  SUCCESS = 0,
  UNKNOWN = 1,
  CONN_TIMEOUT = 2,
  NOT_AUTHORIZED = 3,
  UNSUPPORTED_VERSION = 4
};

Error createMarsError(
    const mars_api::mars_exception& in_exception,
    const ErrorLocation& in_location);

Error createMarsError(
    const mars_api::mars_exception& in_exception,
    const Error& in_cause,
    const ErrorLocation& in_location);

Error createVersionError(
    const std::string& in_supportedVersion,
    const std::string& in_actualVersion,
    const ErrorLocation& in_location);

Error createInternalError(
    InternalError in_code,
    const std::string& in_message,
    const ErrorLocation& in_location);

Error createInternalError(
    InternalError in_code,
    const std::string& in_message,
    const Error& in_cause,
    const ErrorLocation& in_location);

} // namespace mars
} // namespace orchid

#endif

Then MarsError.cpp might look like this:


#include "MarsError.hpp"

#include <Error.hpp>

namespace orchid {
namespace mars {

namespace {

constexpr const char* s_internalErrorName = "InternalPluginError";
constexpr const char* s_marsErrorName = "MarsApiError";

} // anonymous namespace

Error createMarsError(
    const mars_api::mars_exception& in_exception,
    const ErrorLocation& in_location)
{
  return createMarsError(in_exeption, Success(), in_location);
}

Error createMarsError(
    const mars_api::mars_exception& in_exception,
    const Error& in_cause,
    const ErrorLocation& in_location)
{
  MarsError code = MarsError::UNKNOWN;
  if (in_exception.code() == 14) // Connection timeout
    code = MarsError::CONN_TIMEOUT;
  else if (in_exception.code() == 52) // Not authorized
    code = MarsError::NOT_AUTHORIZED;
  
  if (in_cause == Success())
    return Error(
      s_marsErrorName,
      static_cast<int>(code),
      in_exception.what(),
      in_location);
  
  return Error(
    s_marsErrorName,
    static_cast<int>(code),
    in_exception.what(),
    in_cause,
    in_location);
}

Error createVersionError(
    const std::string& in_supportedVersion,
    const std::string& in_actualVersion,
    const ErrorLocation& in_location)
{
  return Error(
    s_marsErrorName,
    static_cast<int>(MarsError::UNSUPPORTED_VERSION),
    "Mars API is version \"" + 
      in_actualVersion + 
      "\" which is not supported. Supported version(s): "
      + in_supportedVersion,
    in_location);
}

Error createInternalError(
    InternalError in_code,
    const std::string& in_message,
    const ErrorLocation& in_location)
{
  return Error(
    s_internalErrorName,
    static_cast<int>(in_code),
    in_message,
    in_location);
}

Error createInternalError(
    InternalError in_code,
    const std::string& in_message,
    const Error& in_cause,
    const ErrorLocation& in_location)
{
  return Error(
    s_internalErrorName,
    static_cast<int>(in_code),
    in_message,
    in_cause,
    in_location);
}

} // namespace mars
} // namespace orchid

As an example of how these functions might be used, consider the example in TODO #10 of the RStudio Launcher Plugin SDK QuickStart Guide. With this new error handling, the Plugin developer may change the implementation to the following:

Error MarsJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
  const options::MarsOptions& opts = options::MarsOptions::getInstance();
  mars_api::job marsJob;
  try
  {
    unsigned long id = std::strtoul(in_jobId);
    marsJob = mars_api::list_job(opts.getMarsServiceUser(), id);
  }
  catch (const std::invalid_argument& e)
  {
    return createInternalError(
      InternalError::CONVERSION_FAILURE, 
      "Job ID " +
        in_jobID +
        " could not be converted to unsigned long: " +
        e.what(),
      ERROR_LOCATION);
  }
  catch (const std::out_of_range& e)
  {
    return createInternalError(
      InternalError::CONVERSION_FAILURE, 
      "Job ID " +
        in_jobID +
        " is out of range to be converted to unsigned long: " +
        e.what(),
      ERROR_LOCATION);
  }
  catch (const mars_api::mars_exception& e)
  {
    return createMarsError(e, ERROR_LOCATION);
  }

  // This should only be invoked for Launcher Jobs because of the filtering in pollJobStatus,
  // so return an error if somehow it's not a Launcher Job.
  if (job._name.find("[RStudio Launcher]") == std::string::npos)
    return createInternalError(
      InternalError::JOB_NOT_FOUND,
      "Job " + in_jobId + " is not an RStudio Launcher Job.",
      ERROR_LOCATION);
    
  out_job = marsJobToJob(marsJob);
  return Success();
}

With this approach, the Plugin developer has created much more consistent and informative errors.

6.2 Date-Time Support

The SDK provides a utility class for working with DateTime objects (include/system/DateTime.hpp). Dates and times can be converted from string to system::DateTime using system::DateTime::fromString, and to string using system::DateTime::toString. When converting from string, the default expected time string format will be YYYY-MM-DDThh:mm:ss.ssssssTZ, or %Y-%m-%dT%H:%M:%S%F%ZP using the supported format specification. This is the ISO 8601 extended time format. When converting to string, the default output format is YYYY-MM-DDThh:mm:ss.ssssssZ, or %Y-%m-%dT%H:%M:%S%FZ. Note that the default output string is in UTC time.

To use a non-default format specification, a custom format specification may be provided to the conversion function. Below is a table which describes the possible format values. As with most string formats, characters which are not prefixed by % will be included in the output (or should be included in the input) verbatim.

Value Description Example
%a Short weekday “Mon”, “Tue”
%A Long weekday “Monday”, “Tuesday”
%b Short month “Nov”, “Dec”
%B Long month “November”, "December
%d Numerical day of the month “01” through “31”, as appropriate for the month
%f Fractional seconds “04:01:33.000000”, “04:52:16.598763”
%F Fractional seconds, if non-zero “04:01:33”, “04:52:16.598763”
%H Hour value, on the 24 hour clock “00” through “23”
%j Numerical day of the year “001” through “365” (or “366” on leap years)
%m Numerical month “01” through “12”
%M Minute value “00” through “59”
%s Seconds with fractional seconds “28.003251”
%S Seconds “28”
%U Numerical week of the year, starting on a Sunday “00” through “53”, where the first Sunday in January is the first day of week 01
%w Numerical day of the week, starting from 0 “0” through “6”
%W Numerical week of the year, starting on a Monday “00” through “53”, where the first Monday in January is the first day of week 01
%y Two digit year 2019 would be “19”
%Y Four digit year “2019”
%ZP Posix time zone string “-07:00”, “PST-08PDT+01,M4.1.0/02:00,M10.5.0/02:00”, “Z”

More advanced formatting flags and additional documentation regarding the parsing and formatting of DateTime objects can be found in Boost’s Date Time I/O documentation.

6.3 User Profiles

It may be useful to allow system administrators to set default or maximum values for certain features on a per-user or per-group basis. For example, if a job scheduling system supports requesting an amount of memory for a job, system administrators may wish to give different memory levels to different groups of users. For more examples, see the sample /etc/rstudio/launcher.kubernetes.profiles.conf in the Job Launcher Plugin Configuration section of the RStudio Job Launcher Guide.

For the convenience of the Plugin Developer, the AbstractUserProfiles class may be overridden to quickly implement support for user profiles.

AbstractUserProfiles contains protected templated functions for getting a value by name. The templates are defined in the CPP file, and are declared for the following types:

  • std::string
  • uint32_t
  • int32_t
  • uint64_t
  • int64_t
  • double
  • bool
  • std::set<U>, where U is one of the above types.
  • std::vector<U>, where U is one of the above types, except std::set.
  • std::map<U, V>, where U and V are any pair of the above types.

If a custom type is needed, retrieve the value as a string and then parse it as needed. For an example, see the TestUserProfiles class in sdk/src/options/tests/UserProfilesTests.cpp.

The minimum requirements to implement AbstractUserProfiles are:

  • A public constructor which sets the plugin name via the AbstractUserProfiles(const std::string& in_pluginName) constructor. Alternately, a private constructor with a public static getInstance method may be used to implement the singleton pattern. This will prevent the need to read the configuration file multiple times.
  • An implementation of AbstractUserProfiles::getValidFieldNames which returns a set of all supported values that may be set via the user profiles configuration file.
  • An implementation of AbstractUserProfiles::validateValues which calls one of the two protected AbstractUserProfiles::validateValue methods for each valid field, with the appropriate template parameter.

If the above criteria are met, the expected location of the user profiles configuration file will be /etc/rstudio/launcher.<plugin name>.profiles.conf.

The AbstractUserProfiles::validateValues method is called by AbstractUserProfiles::initialize after the user profiles configuration file has been read and parsed to ensure that any configuration mistakes within the file will be caught early. The AbstractUserProfiles::initialize method should be called from the IJobSource::initialize method to ensure that the user profiles configuration file has been read into memory and parsed before the Plugin enters normal operation mode. If the user profiles initialize method returns an error, the IJobSource::initialize method should also return an error.

6.3.1 Example

This example continues the examples started in the RStudio Launcher Plugin SDK QuickStart Guide. Assume that the Mars job scheduling system supports requesting a CPU count and an amount of memory, in MB. For simplicity, this examples implements the MarsUserProfiles class completely within the hpp file.

MarsUserProfiles.hpp

#include <options/AbstractUserProfiles.hpp>

#include <Error.hpp>
#include <system/User.hpp>

namespace orchid {
namespace mars {
namespace options {
  
using namespace rstudio::launcher_plugins;
  
class MarsUserProfiles : public options::AbstractUserProfiles
{
  public:
    static MarsUserProfiles& getInstance()
    {
      static MarsUserProfiles userProfiles;
      return userProfiles;
    }

    uint32_t getDefaultCpus(const system::User& in_user) const
    {
      // Default value is 1.
      uint32_t defaultCpus = 1;
      Error error = getValueForUser("default-cpus", in_user, defaultCpus);
      
      // It shouldn't be possible to get any Error except a not-found error here because of
      // validateValues. If it somehow occurred in release, just return the default value.
      assert(!error || isValueNotFoundError(error));
      
      return defaultCpus;
    }
  
    uint32_t getMaxCpus(const system::User& in_user) const
    {
      // Default value is 1.
      uint32_t maxCpus = 1;
      Error error = getValueForUser("max-cpus", in_user, maxCpus);
      
      // It shouldn't be possible to get any Error except a not-found error here because of
      // validateValues. If it somehow occurred in release, just return the default value.
      assert(!error || isValueNotFoundError(error));
      
      return maxCpus;
    }
  
    uint32_t getMaxMemory(const system::User& in_user) const
    {
      // Default value is 10 MB.
      uint32_t maxMemory = 10;
      Error error = getValueForUser("max-memory-mb", in_user, maxMemory);
      
      // It shouldn't be possible to get any Error except a not-found error here because of
      // validateValues. If it somehow occurred in release, just return the default value.
      assert(!error || isValueNotFoundError(error));
      
      return maxMemory;
    }
  
    uint32_t getDefaultCpus(const system::User& in_user) const
    {
      // Default value is 5 MB.
      uint32_t defaultMemory = 5;
      Error error = getValueForUser("default-memory-mb", in_user, defaultMemory);
      
      // It shouldn't be possible to get any Error except a not-found error here because of
      // validateValues. If it somehow occurred in release, just return the default value.
      assert(!error || isValueNotFoundError(error));
      
      return defaultMemory;
    }
  
  private:
    MarsUserProfiles() :
      AbstractUserProfiles("mars")
    {
      m_validFieldNames.insert("max-cpus");
      m_validFieldNames.insert("default-cpus");
      m_validFieldNames.insert("max-mem-mb");
      m_validFieldNames.insert("default-mem-mb");
    }
    
    const std::set<std::string>& getValidFieldNames() const override
    {
      return m_validFieldNames;
    }
  
    Error validateValues() const override
    {
      // For supported types, validateValue will attempt to parse every occurrence of the field as
      // specified type. If a custom type is desired, use
      // AbstractValidateValue::validateValue(
      //    const std::string& in_value,
      //    const CustomValueValidator& in_validator) const;
      // method to supply a custom validator instead. in_validator should parse the value it is
      // supplied and return an error if parsing fails.
      Error error = validateValue<uint32_t>("default-cpus");
      if (error)
        return error;
      
      error = validateValue<uint32_t>("max-cpus");
      if (error)
        return error;
      
      error = validateValue<uint32_t>("default-memory-mb");
      if (error)
        return error;
      
      return validateValue<uint32_t>("max-cpus");
    }
    
    std::set<std::string> m_validFieldNames;
}

} // namespace options
} // namespace mars
} // namespace orchid

MarsJobSource.cpp


// Other includes...

#include <options/MarsOptions.hpp>
#include <options/MarsUserProfiles.hpp>

// Other methods...

Error MarsJobSource::initialize()
{
  const options::MarsOptions& opts = options::MarsOptions::getInstance();
  
  try
  {
    mars_api::init(opts.host(), opts.port(), opts.useSsl());
  }
  catch (const mars_expcetion& e)
  {
    return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
  }
  
  const options::MarsUserProfiles& userProfiles = options::MarsUserProfiles::getInstance();
  return userProfiles.initialize();
}

// Other methods..

6.4 Custom Job Source Configuration

Important: This feature is not exposed through the RStudio Workbench job launching UI. The use of this feature will require a feature request to the RStudio IDE project. This feature should only be used when there are no other alternatives.

The Cluster Info Response is used to report the configuration and capabilities of the Plugin. The RStudio Launcher Plugin SDK QuickStart Guide describes how the Plugin Developer may declare support for various types of resource limits, containers, custom job placement constraints, and job queues. In the event that there is some job configuration setting that is not covered by one of those built-in job settings, the Job Config feature may used.

A JobConfig value consists of the name of the configuration setting, its type, and optionally its value. It may have one of four types: string, int, enum, or float.

To declare support for a custom job configuration value, create a JobConfig object that represents the name and type of that value and add it to the JobSourceConfiguration::CustomConfig vector in the overridden implementation of IJobSource::getConfiguration.

When a job is submitted, any custom configuration values that were set on the job can be found on Job::Config.

6.5 Job Status Updates

The Plugin needs to keep an accurate record of all of the Jobs that were submitted to the Job Scheduling System by the Launcher. This can be implemented in any way that suits the Job Scheduling System as long as JobStatusNotifier::updateJobStatus is invoked each time the status of a job changes. The JobStatusNotifier::updateJobStatus validates that the current status really is an update, so there is no need for the Plugin implementation to do that check.

The two most common ways to implemented this feature are streaming the Job statuses and polling the Job statuses. Both methods can be implemented with the help of the AbstractJobStatusWatcher base class; however, streaming is the preferred method, as it should be more efficient than polling.

The AbstractTimedJobStatusWatcher class, which extends the AbstractJobStatusWatcher class, implements common functionality for polling job statuses. For more details about implementing Job status updates via polling, see TODO #’s 9 - 11 in the RStudio Launcher Plugin SDK QuickStart Guide.

6.5.1 Streaming

Streaming is the preferred method for implementing job status updates, as it can be more efficient than polling job statuses. This is because polling requires making a job status request of the Job Scheduling System every interval of time, and may result in reading the same status multiple times before a change is observed. If the Job Scheduling System provides an API that streams Job status changes, the Plugin should only have to process each status change once.

6.5.1.1 Example

Suppose that the Mars API provides a stream_statuses function which takes a callback function as a parameter with the signature std::function<void(const mars_api::job_status&)>. Assume that the mars_api::job_status structure has the Job ID, the Job name, the Job Status, the last modification time, and the reason for the current Job status. Then the Plugin developer might change the implementation of MarsJobStatusWatcher to the following:

MarsJobStatusWatcher.hpp:


#ifndef ORCHID_MARS_MARS_JOB_STATUS_WATCHER_HPP
#define ORCHID_MARS_MARS_JOB_STATUS_WATCHER_HPP

#include <jobs/AbstractJobStatusWatcher.hpp>

#include <memory>

namespace orchid {
namespace mars {

class MarsJobStatusWatcher : 
  public jobs::AbstractJobStatusWatcher,
  public std::enable_shared_from_this<MarsJobStatusWatcher>
{
public:
   /**
    * @brief Constructor.
    *
    * @param in_jobRepository           The job repository, from which to look-up jobs.
    * @param in_jobStatusNotifier       The job status notifier to which to post job updates.
    */
   MarsJobStatusWatcher(
      jobs::JobRepositoryPtr in_jobRepository,
      jobs::JobStatusNotifierPtr in_jobStatusNotifier);

  /**
   * @brief Starts the Job status watcher.
   *
   * @return Success if the Job status watcher could be started; Error otherwise.
   */
  Error start();
  
  /**
   * @brief Stops the Job status watcher.
   */
  void stop();
  
private:
   /**
    * @brief Handles a change in job status when it is reported by the Mars Job Scheduling System.
    * 
    * @param in_jobStatus     The new job status.
    */
   void onJobStatusUpdate(const mars_api::job_status& in_jobStatus);

   /**
    * @brief Gets the job details for the specified job.
    *
    * @param in_jobId   The ID of the job to retrieve.
    * @param out_job    The populated Job object.
    *
    * @return Success if the job details could be retrieved and parsed; Error otherwise.
    */
   Error getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const override;
   
   // The Job status stream.
   std::unique_ptr<mars_api::status_stream> m_jobStream;
};

/** Convenience typedef. */
typedef std::shared_ptr<MarsJobStatusWatcher> MarsJobStatusWatcherPtr;

} // namespace mars
} // namespace orchid

#endif

MarsJobStatusWatcher.cpp:

#include "MarsJobStatusWatcher.hpp"

namespace orchid {
namespace mars {

typedef std::shared_ptr<MarsJobStatusWatcher> SharedThis;
typedef std::weak_ptr<MarsJobStatusWatcher> WeakThis;

MarsJobStatusWatcher::MarsJobStatusWatcher(
   jobs::JobRepositoryPtr in_jobRepository,
   jobs::JobStatusNotifierPtr in_jobStatusNotifier) :
      jobs::AbstractJobStatusWatcher(
         std::move(in_jobRepository),
         std::move(in_jobStatusNotifier))
{
}

Error MarsJobStatusWatcher::start()
{
  WeakThis weakThis = shared_from_this();
  
  const options::MarsOptions& opts = options::MarsOptions::getInstance();
  try
  {
    m_jobStream = std::move(mars_api::stream_statuses(
      [weakThis](const mars_api::job_status& in_jobStatus)
      {
        if (SharedThis sharedThis = weakThis.lock)
          sharedThis->onJobStatusUpdate(in_jobStatus);
      }));
  }
  catch (const mars_api::mars_exception& e)
  {
    return createMarsError(e, ERROR_LOCATION);
  }
}

void MarsJobStatusWatcher::stop()
{
  m_JobStream.reset();
}

void MarsJobStatusWatcher::onJobStatusUpdate(const mars_api::job_status& in_jobStatus)
{
   if (in_jobStatus._name.find("[RStudio Launcher]") != std::string::npos)
   {
      system::DateTime lastModified;
      Error error = system::DateTime::fromString(job._last_modified, lastModified);
      if (error)
      {
        // Use the current time as the invocation time instead, but log an error.
        logging::logError(error, ERROR_LOCATION);
        error = updateJobStatus(
          std::to_string(job._id),
          marsStatusToStatus(job._status),
          job._reason);
      }
      else
      {
        error = updateJobStatus(
          std::to_string(job._id),
          marsStatusToStatus(job._status),
          job._reason,
          lastModified);
      }
      
      if (error)
        logging::logError(error, ERROR_LOCATION);
   }
}

Error MarsJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
  const options::MarsOptions& opts = options::MarsOptions::getInstance();
  mars_api::job marsJob;
  try
  {
    unsigned long id = std::strtoul(in_jobId);
    marsJob = mars_api::list_job(opts.getMarsServiceUser(), id);
  }
  catch (const std::invalid_argument& e)
  {
    return createInternalError(
      InternalError::CONVERSION_FAILURE, 
      "Job ID " +
        in_jobID +
        " could not be converted to unsigned long: " +
        e.what(),
      ERROR_LOCATION);
  }
  catch (const std::out_of_range& e)
  {
    return createInternalError(
      InternalError::CONVERSION_FAILURE, 
      "Job ID " +
        in_jobID +
        " is out of range to be converted to unsigned long: " +
        e.what(),
      ERROR_LOCATION);
  }
  catch (const mars_api::mars_exception& e)
  {
    return createMarsError(e, ERROR_LOCATION);
  }

  // This should only be invoked for Launcher Jobs because of the filtering in pollJobStatus,
  // so return an error if somehow it's not a Launcher Job.
  if (job._name.find("[RStudio Launcher]") == std::string::npos)
    return createInternalError(
      InternalError::JOB_NOT_FOUND,
      "Job " + in_jobId + " is not an RStudio Launcher Job.",
      ERROR_LOCATION);
    
  out_job = marsJobToJob(marsJob);
  return Success();
}

} // namespace mars
} // namespace orchid

6.5.2 Other Methods

It is possible that neither streaming nor polling are the best solution for keeping job statuses up to date. The use of an AbstractJobStatusWatcher is completely optional, and the Plugin developer may choose to implement this feature in any way that best suits the Job Scheduling System. For example, the provided sample Local Plugin does not use an AbstractJobStatusWatcher. Jobs are launched on the local system by forking a new processes and running the requested command in that process. The Local Plugin receives notifications when the child process writes to standard out, standard error, or exits. When the process exits, the Job state is transitioned from Job::State::RUNNING to Job::State::FINISHED. The Local Plugin also keeps track of when the process should transition from Job::State::PENDING to Job::State::RUNNING by checking whether the executable name has changed from rsandbox (a utility for launching processes in an isolated environment provided with the RStudio Workbench installation) to the name of the actual executable for the Job.

6.6 Customizing the Job Repository

In ‘TODO #8’ of the ‘RStudio Launcher Plugin SDK QuickStart Guide’, the Plugin developer implemented the AbstractJobRepository::loadJobs method to populate the Job Repository on bootstrap. In the case that the Plugin needs to do special processing when a Job is added or removed from the repository, it can do so by overriding the other virtual methods on AbstractJobRepository.

An example of when this may be necessary is if the Plugin needs to do additional Job state persistence, beyond what the Job Scheduling System will save. A common case of this is Job output. If the user does not specify an output file the Job Scheduling System may not persist the Job output; however, it must be available to the Launcher until the Job expires according to the Launcher’s configured job-expiry-hours.

There are three additional virtual methods on AbstractJobRepository that allow the Plugin developer to customize the behavior of the Job Repository:

  • AbstractJobRepository::onJobAdded: this method will be invoked when a job is first added to the repository, immediately after successful submission.
  • AbstractJobRepository::onJobRemoved: this method will be invoked when an expired Job is removed from the system. Any files or other persistent data that were created by the Plugin should be cleaned up in this method.
  • AbstractJobRepository::onInitialize: this method will be invoked once, when the Job Repository is initialized during bootstrap. The Plugin may do any extra initialization steps that are required and is responsible for returning an Error if any necessary initialization steps fail.

The provided sample Local Launcher Plugin manages Job persistence completely within the Plugin. The LocalJobRepository implementation may be used as an example for the implementation of all three virtual methods on AbstractJobRepository.

6.7 Process Launching

Depending on the API exposed by the Job Scheduling System, it may be necessary to launch child processes to perform actions on the Job Scheduling System, such as running a job or listing the jobs in the system. The SDK provides a number of classes and functions for launching child processes in the system/Process.hpp header file.

Child processes launched through the SDK provided process module are run through the rsandbox process by default. This is done to ensure that the child process will be run in an isolated environment, however it prevents the parent process from continuing to write standard input to the child process. If this is needed by the Plugin, it is possible to launch the child process directly and keep the standard input stream open by setting system::process::ProcessOptions::UseSandbox = false and system::process::ProcessOptions::CloseStdIn = false respectively.

The SDK process launching module will escape the command, arguments, standard input, the standard output and standard error files, and environment variables as appropriate. The command, arguments and environment values will be treated literally. Bash expansion of them will not take place. Bash expansion may take place within the standard input, however.

6.8 Custom Output Streams

To create a custom output stream, the Plugin developer must create a class which inherits api::AbstractOuptutStream and implements the api::AbstractOutputStream::start and api::AbstractOutputStream::stop methods.

In the api::AbstractOutputStream::start method, the output stream implementation should begin reporting the Job’s output. To report output, the implementation must invoke the protected method api::AbstractOutputStream::reportData specifying the data and the type of output. The output type will be OutputType::STDOUT for standard output, OutputType::STDERR for standard error, or if it is not possible to tell OutputType::BOTH. It may not be possible to tell the output type if the Job specified the same output location for both standard output and standard error output.

When the stream has completed, the output stream implementation should invoke the protected method api::AbstractOututStream::setStreamComplete. The stream is complete when the Job has finished emitting all output. This can only happen if the Job is in a completed state, which can be tested with api::Job::isCompleted. Even if a Job has completed, some Job Scheduling Systems buffer job output, so it may take a few seconds after the Job has completed for the remainder of the job output to be emitted.

If api::AbstractOutputStream::stop has been invoked, the output stream implementation should stop streaming data, even if the stream has not been completed.

For an example of a correct and complete implementation of an api::AbstractOutputStream child class, please refer to api::FileOutputStream.

6.8.1 Customizing the File Output Stream

It is possible that the Plugin will be able to read Job output from a file, but it will need to process the Job output in some way before surfacing the output to the user. For example, the RStudio Slurm Launcher Plugin emits one line of output at the start of each Job that represents extra Job metadata that it needs, and one line at the end of each Job to indicate that all output has been emitted. In that case, the Plugin may customize the behavior of the api::FileOutputStream class by inheriting from it and overriding api::FileOutputStream::onOutput and/or api::FileOutputStream::waitForStreamEnd. By default api::FileOutputStream::onOutput emits every line of output and api::FileOutputStream::waitForStreamEnd waits for a fixed short period of time after the Job enters a completed state before ending the stream.

The RStudio Slurm Launcher Plugin would override api::FileOutputStream::onOutput to skip the first and last lines of output, and to notify a condition variable when the last line of output is emitted. It would override api::FileOutputStream::waitForStreamEnd to wait on the aforementioned condition variable instead of waiting for a fixed period of time.