Parallel Query Engine Customization

Printer-friendly version

The purpose of this article is to provide programmers with more information about overriding the default ooQuerySplitter, ooTaskAssigner, and how to define the subclass of ooQueryFilter for the Parallel Query Engine feature. The functions below are taken from the PQE Functional Specification.

The Parallel Query Engine is an API feature that is available in R9.2. The description of the PQE is documented in Objectivity/DB C++ Programmer's Guide. This API will be available for Java and Python is later releases.

In order to utilize the Parallel Query Engine, the user will need to:

  • Start a Query Server process running on each database file server host. (This tool is described in the Objectivity/DB Administration manual)
  • Modify the application source code to use the parallel variant of the scan function where appropriate for scans with FD or DB scope.
  • Optionally define a custom splitter and pass it in the scan call.
  • Optionally define a custom task assigner and register it in the program.
  • Optionally implement a Query Thread filter function, install it for the Query Server to use, and pass its name and arguments in the scan call.

The API scan function:

ooItr(appClass)::parallelScan( const ooRefHandle(ooObj)& storageObject, ooMode openMode, const char* predicate, ooQuerySplitter* splitter = NULL, const char* filterName = NULL, const void* filterData = NULL, int filterDataLength = 0)

This function initiates a predicate scan using the Parallel Query Engine. It prepares for the iterator's next member function to visit the instances of class appClass within storageObject that satisfy the given predicate and filter (if any). The parameters storageObject, openMode, and predicate have the same meaning as for the existing scan function for predicate scans, except that the storage object can only be a federation or database, not a container.

The added optional parameters are:

The query splitter to be used. A value of NULL designates the default splitter. The splitter class is documented below.
Name of a filter function to be applied to each qualified object by the Query Threads. If NULL, no filter will be applied. Further details about defining filter functions is given below.
Starting address of a block of data to be used by the filter function. A copy of the data will be transmitted to the Query Agent which will pass the address of the copy to the filter. Note that since this data will be used in a different process, it is not meaningful for it to contain pointers. Even if a filter name is given, the filter data may still be NULL to indicate that there is no data to pass.
Length, in bytes, of the filter data. This would typically be sizeof(structure) or strlen(string)+1 . The length can not be more than about 60K [exact limit to be determined after implementation].

The MROW mode, lock wait time, server wait time, and non-quorum read enablement (from ooSession::setAllowNonQuorumRead()) of the current transaction will be transmitted to the Query Agent to be used by the transactions in the Query Threads so that their behavior will be as consistent as possible with that of a scan performed locally in the current thread. The Query Threads will use MROW mode either if the client thread does or if the client thread is doing an update transaction.

If the predicate expression uses any user-defined operators, those operator definitions will need to be installed on each Query Server. This can be done in function ooInitQueryFilter below.

Corresponding Java API:
      ooFDObj.parallelScan(String className);
      ooFDObj.parallelScan(String className, QuerySplitter qs);
      ooFDObj.parallelScan(String className, String predicate);
      ooFDObj.parallelScan(String className, String predicate, QuerySplitter qs);
      ooDBObj.parallelScan(String className);
      ooDBObj.parallelScan(String className, QuerySplitter qs);
      ooDBObj.parallelScan(String className, String predicate);
      ooDBObj.parallelScan(String className, String predicate, QuerySplitter qs);
Corresponding Python API:
      Itr DbObj.getParallelScanItr(className, predicate = '', filterName = '', \
				   filterData = '')
      Itr FdObj.getParallelScanItr(className, predicate = '', filterName = '', \
				   filterData = '')


A query splitter is implemented by an instance of non-persistent capable class ooQuerySplitter, which is defined in header file "ooPQE.h". The virtual member functions of that class implement the default splitter, while users can define derived classes which override the member functions to provide customized behavior. Derived classes will typically also hold data items, such as the limit values for a range-based splitter. For example, a typical usage might look like:
   ooItr(foo) itr;
   MyRangeSplitter splitter(begin_time, end_time);
   itr.parallelScan(scope_handle, oocRead, predicate, &splitter);

The class ooQuerySplitter has a constructor with no arguments, and the following virtual member functions:

Note: this is a preliminary concept that may be revised in later versions.
void initScan(const ooHandle(ooObj)& storageObject, ooTypeNumber typeN, const ooExpressionTree* predicate)

Before any of the other functions are called, this will be called to pass the storage object from the call of parallelScan and the type number of the class being iterated over. If a derived class overrides this method without overriding all of the methods, then it should call the base class method also.

The predicate argument will not be usable initially; in future releases it will be a pointer to the predicate expression in parse tree form. [The expression tree API will be part of the separate project for an enhanced Query Engine.]

void endScan()

This function will be called when the scan is ended (either after all of the data has been processed or when the iterator's end method or destructor is called). If the splitter instance was dynamically allocated, this would be a good place to delete it. An override method should also call the base class method.

ooBoolean nextRange(ooLongId & start, ooLongId & end)

This is the top-level function that actually implements the splitting of non-indexed scans. Each time it is called, it will set start and end to designate the inclusive range to be scanned by one query thread. Often the two OIDs will be the same, but they could be different to specify a range of containers to be scanned. The function returns true when the OIDs are set or false when there is no more data to be scanned.

The default method will utilize information from the Placement Manager (when that becomes available Post Phase 1) if the class is one that it controls. Otherwise, the default method calls nextDB (if the scan scope is the federation instead of a database) and nextCont (described below). The exact algorithm is a heuristic that will not be publicly documented.

[Note: The class ooLongId already exists in Release 9.0, but is not yet documented in the user manuals. For the purposes of the PQE API, it should be sufficient to know that it supports the same member functions and operators as ooId and is assignment-compatible with ooHandle(ooObj).]

Typically a custom splitter would be implemented by overriding the three methods above. Instead, the methods that follow could be overridden in order to modify the behavior of the default nextRange function.
ooBoolean applicableDB(const ooHandle(ooDBObj)& dbh, ooBoolean* filterContsPtr)

The input argument is a database handle designating an existing database. The function should return oocTrue if the database should be scanned, or oocFalse if it should be skipped. When the function returns true, it should also set *filterContsPtr to oocFalse if the whole database should be scanned or to oocTrue if only selected containers are to be scanned.
The default method always returns true and sets *filterContsPtr to oocFalse.

ooBoolean applicableCont(const ooHandle(ooContObj)& ch)

The argument is a container handle designating an existing container in a database for which applicableDB returned true. The function should return oocTrue if the container should be scanned, or oocFalse if it should be skipped.
The default method ignores the arguments and always returns true.

ooBoolean nextDB(ooHandle(ooDBObj)& dbh, ooBoolean* filterContsPtr)

The database number in the handle will be set to the next database to be scanned after the one designated by the value of the handle on entry. Returns oocTrue if the handle has been updated with another database to be scanned, or oocFalse if there are no more applicable databases. When the function returns true, it should also set *filterContsPtr to oocFalse if the whole database should be scanned or to oocTrue if only selected containers are to be scanned. The first time this is called, the handle will be null on entry and should be set to the first applicable database. This function will be called only if the scan storage object is a federation handle instead of a database handle.

The default method sets the database handle to the next database number (starting from 2) which exists and for which applicableDB returns true. The filterContsPtr is passed to applicableDB.

[Design note: it may appear redundant to have both applicableDB and nextDB, but applicableDB is needed for splitting indexed scans but may not be sufficiently efficient by itself for splitting non-indexed scans.]

ooBoolean nextCont(ooHandle(ooContObj)& conth)

The container number in the handle will be set to the next container to be scanned after the one designated by the value of the handle on entry. Returns oocTrue if the handle has been updated with another container to be scanned, or oocFalse if there are no more applicable containers in the same database. The first time this is called, the handle will have the database number but the container number will be 0 on entry and should be set to the first applicable container in that database. This function might not be called on databases for which nextDB has set *filterContsPtr to false.

The default method sets the container handle to the next container number (starting from 2) which exists and for which applicableCont returns true.

ooBoolean applicableObject(const ooLongId & objId)

Given the OID of an object, this function returns true if the OID should be passed to a Query Thread to apply the predicate and filter, or false if the object should be skipped. This would be used for splitting indexed scans or scalable collection scans (Post Phase 1). The default method calls applicableDB and, if necessary, applicableCont.

Note that splitter methods will be called from the Query Manager thread, in the context of its own read-only transaction. Therefore, they can't share handles with the main application code and won't see any changes to persistent data made in the application's current transaction. Any sharing of data with the application code should be done with the awareness that there will be access from two asynchronous threads. A splitter instance can be used by only one iteration at a time and must not be deleted before that iteration is terminated.

Java API:
      void initScan(ooId obj, long typeNumber, ExpressionTree expr)
      boolean nextRange(ooIdPQ start, ooIdPQ end)
      void endScan()
      boolean applicableDB(ooId db)
      boolean applicableCont(ooId cont)
      ooId nextDB(ooId db)
      ooId nextCont(ooId cont)
      boolean filterContainers(ooId db)
      boolean applicableObject(ooId oid)

Corresponding Python API:
     Splitters are not (yet) implemented for Python

Task Assigners

A task assigner is an instance of the following class:

  class ooTaskAssigner {
    virtual const char* assignTask(const ooLongId & oid);
Given the starting OID of a range to be searched, the assignTask function should return the host name of the Query Agent that should perform the search. If NULL is returned, an in-process Query Agent will be used (Post Phase 1). Otherwise, the task will be passed to the Query Server process running on the designated host.

The default task assigner will return the host on which the database file resides. For a replicated database, it will check which of the available replicas have a Query Server running on the host of the image file; if there is more than one, the tasks will be automatically divided among them. Users may define a derived class to override the default behavior.

To cause a user-defined task assigner to be used, it needs to be registered by passing it as the argument to the following static member function:

  void ooObjy::setTaskAssigner(ooTaskAssigner* assigner)
The currently registered instance can be obtained by calling:
  ooTaskAssigner* ooObjy::getTaskAssigner()
Note that task assigner registration is global for the process, not specific to the current session.

The following function is also provided:

ooBoolean ooObjy::checkQueryServer(const char * hostName);

Returns true if a Query Server is running on the host named by the argument. A null argument value tests for the availability of an in-process Query Agent (false in Phase 1).

This can be used by a task assigner to avoid returning a host that doesn't actually have a query server running.

Java API:
      New Class:
            String assignTask(ooId oid)

      New API in Connection class:
            TaskAssigner getTaskAssigner()
            void setTaskAssigner(TaskAssigner ta)
            boolean checkQueryServer(String hostname)
Corresponding Python API:
      TaskAssigner is not (yet) implemented for Python


[ Note: this section is a short-term design that is likely to become obsolete in Release 10. ]
The Query Agent uses a shared library called "" (or "liboo_qsfilter.dll" on Windows) which contains dummy definitions of the following two functions. Users can replace this library with their own implementation of these functions in order to provide server-side filter functions.

void ooInitQueryFilter ()

This function will be called just once per process as a place to perform any necessary initialization. The default implementation does nothing. This could be used to install custom predicate operators.

ooQueryFilter* ooNewQueryFilter( const char* filterName, ooTypeNumber typeN, const char* predicate, const void* filterData, int filterDataLength, ooBoolean swapBytes)

This function will be called once at the beginning of a scan which specified a non-null filter name. The typeN and predicate parameters describing the scan are also passed in case they might be useful. It would be prudent to verify that the type number is not something unexpected in order to avoid having the filter doing an inappropriate type cast on the received object.

The filterData and filterDataLength parameters are a copy of the filter data from the parallelScan call. This function should perform whatever parsing or conversion is needed on the data. If swapBytes is true, the data is from a machine architecture with opposite byte order, so this function will need to perform any necessary byte swapping. To avoid cross-platform difficulties, it may be best to encode the data as a character string instead of passing binary data structures; in that case, the swapBytes parameter can be ignored.

This function should create and return a new instance of a user-defined concrete subclass of the abstract class ooQueryFilter (below), initializing the data members with whatever information will be needed from this function's parameters.

If the filter name is not recognized as one that is being supported, the function should return NULL. Other errors could be reported by throwing an error message string as an exception. For example:

  throw "Unsupported type number";

The default implementation ignores the parameters and always returns NULL.

This function will not be called within a transaction and will not have an attached session, so it is not allowed to do any database operations.

The abstract base class for filters is defined in header file "ooPQE.h" as:
  class ooQueryFilter {
      virtual ooBoolean filter(ooHandle(ooObj)& objH) = 0;
      virtual ~ooQueryFilter() = 0;
The filter member function will be called with an open handle to an object which satisfies the predicate. It should return oocTrue if the object is to be passed back to the application program, or oocFalse if it should be skipped. If an exception is thrown out of the function, the scan will be aborted and an exception will be thrown in the client application thread during a subsequent call to next. The filter will be called in the context of a read-only transaction, so it can open associated objects as needed to do its testing, but it should not attempt to change the transaction state (such as by commit or upgrade).

Note that the same filter object will be used by filter function calls in multiple threads, so its data members shouldn't be modified by the filter function unless appropriate care is taken to do it in a thread-safe way. There could, however, be more than one filter object at the same time, where each is serving a different client.

At the completion of the scan, the filter object will be deleted by the Query Agent. As with the constructor, the destructor call will not be within a transaction.

Note that even if the application program is written in Java, any necessary filter functions will still need to be written in C++.

Corresponding Python API:
      Filters are not (yet) implemented for Python
Monday, October 29, 2012