PQE Customization

Printer-friendly version

The purpose of this article is to provide programmers with information about customizing certain aspects of a parallel query.

Objectivity/DB language bindings have supported the parallel query feature since R9.2. Standard parallel queries are described in the documentation for Objectivity/C++, Objectivity for Java, Objectivity/.NET for C#, and Objectivity/Python.

To perform a standard parallel query, you:

  • Run a query server on each data-server host on which a database file resides. (The query server is described in the Objectivity/DB Administration manual.)
  • Modify the application source code to use the parallel variant of the scan method where appropriate. See Method for Performing a Parallel Scan below.

You can customize a parallel query by:

  • Optionally implementing one or more custom query operators and providing the implementation as a plugin to each query server, as described in the Custom Operator Support topic.
  • Optionally defining a custom splitter and passing it to the method invoking the parallel scan, as described in Query Splitters below.
  • Optionally defining a custom task assigner and registering it with the program as described in Task Assigners below.
  • Optionally starting an in-process query server from within the querying application, as described in In-Process Query Servers below.
Note: Custom task assigners and in-process query servers can also be used to customize navigation queries that use the distributed-navigation policy.  See objy::ra::navigation::Navigator in the Objectivity/C++ Programmer's Reference .

 

Method for Performing a Parallel Scan

Note: Parallel scans are supported in Objectivity/C++, Objectivity for Java, Objectivity/Python, and Objectivity/.NET for C#.
In Objectivity/C++:
ooStatus
ooItr(appClass)::parallelScan( const ooRefHandle(ooObj)& storageObject, ooMode openMode, const char* predicate, ooQuerySplitter* splitter = NULL);

This method initiates a predicate scan as a parallel query. It prepares for the iterator's next method to visit the instances of class appClass within storageObject that satisfy the given predicate. The parameters storageObject, openMode, and predicate have the same meaning as for the existing scan method for predicate scans, except that the storage object is restricted as follows:

  • In an application accessing a placement-managed federated database, the scope object must be the federation itself.
  • In an application accessing a non-placement-managed federated database, the scope object may be either the federation or a particular database (but not a container).

The optional parameter splitter specifies a custom query splitter. A value of NULL designates the default splitter. The splitter class is described below.

The current transaction's MROW mode, lock wait time, server wait time, and non-quorum read property (from ooSession::setAllowNonQuorumRead()) will be used by the transactions in the query threads, so that their behavior will be as consistent as possible with that of a scan performed locally in the current thread. The query threads will use MROW mode either if the client thread does or if the client thread is doing an update transaction.

If the predicate expression uses any custom operators, those operator definitions must be provided as a plugin, as described in the Custom Operator Support topic.

In Objectivity for Java:
      ooFDObj.parallelScan(String className, String predicate);
      ooFDObj.parallelScan(String className, String predicate, QuerySplitter qs);
      ooFDObj.parallelScan(String className, ObjectQualifier oq);
      ooFDObj.parallelScan(String className, ObjectQualifier oq, QuerySplitter qs);  
      ooDBObj.parallelScan(String className, String predicate);   
      ooDBObj.parallelScan(String className, String predicate, QuerySplitter qs); 
      ooDBObj.parallelScan(String className, ObjectQualifier oq);     
      ooDBObj.parallelScan(String className, ObjectQualifier oq, QuerySplitter qs);  
Note: The ooDBObj methods are for backward compatibility only in an application that accesses a non-placement-managed federated database.
In Objectivity/Python:
      Itr FdObj.getParallelScanItr(className, predicate = '', filterName = '', \
				   filterData = '')
      Itr DbObj.getParallelScanItr(className, predicate = '', filterName = '', \
				   filterData = '')

 

You should omit the filterName and filterData parameters.

Note: The DBObj method is for backward compatibility only in an application that accesses a non-placement-managed federated database.
In Objectivity/.NET for C#:
Federation class:
  public IEnumerable<T> ParallelScan<T>(
  	string predicate
  )
  where T : IReferenceableObject

 

Query Splitters

Note: This section applies only to applications that access non-placement-managed federated databases.
Note: Query splitters are supported only by Objectivity/C++ and Objectivity for Java.

A query splitter is implemented by an instance of non-persistent capable class ooQuerySplitter, which is defined in header file "ooPQE.h". The virtual methods of that class implement the default splitter, while users can define derived classes which override the methods to provide customized behavior. Derived classes will typically also hold data items, such as the limit values for a range-based splitter. For example, a typical Objectivity/C++ usage might look like:

   ooItr(foo) itr;
   MyRangeSplitter splitter(begin_time, end_time);
   itr.parallelScan(scope_handle, oocRead, predicate, &splitter);
In Objectivity/C++:

The class ooQuerySplitter has a constructor with no arguments, and the following virtual methods:

void initScan(const ooHandle(ooObj)& storageObject, ooTypeNumber typeN, const ooExpressionTree* predicate)

Before any of the other methods are called, this will be called to pass the storage object from the call of parallelScan and the type number of the class being iterated over. If a derived class overrides this method without overriding all of the methods, then it should call the base class method also.

The predicate argument is a pointer to the predicate expression in parse tree form.

void endScan()

This method will be called when the scan is ended (either after all of the data has been processed or when the iterator's end method or destructor is called). If the splitter instance was dynamically allocated, this would be a good place to delete it. An override method should also call the base class method.

ooBoolean nextRange(ooLongId & start, ooLongId & end)

This is the top-level method that actually implements the splitting of non-indexed scans. Each time it is called, it will set start and end to designate the inclusive range to be scanned by one query thread. Often the two OIDs will be the same, but they could be different to specify a range of containers to be scanned. The method returns true when the OIDs are set or false when there is no more data to be scanned.

The default method calls nextDB (if the scan scope is the federation instead of a database) and nextCont (described below). The exact algorithm is a heuristic that will not be publicly documented.

[Note: The class ooLongId is not documented in the user manuals. For the purposes of implementing a query splitter, it should be sufficient to know that it supports the same methods and operators as ooId and is assignment-compatible with ooHandle(ooObj).]

Typically a custom splitter would be implemented by overriding the three methods above. Instead, the methods that follow could be overridden in order to modify the behavior of the default nextRange method.

ooBoolean applicableDB(const ooHandle(ooDBObj)& dbh, ooBoolean* filterContsPtr)

The input argument is a database handle designating an existing database. The method should return oocTrue if the database should be scanned, or oocFalse if it should be skipped. When the method returns true, it should also set *filterContsPtr to oocFalse if the whole database should be scanned or to oocTrue if only selected containers are to be scanned.
The default method always returns true and sets *filterContsPtr to oocFalse.

ooBoolean applicableCont(const ooHandle(ooContObj)& ch)

The argument is a container handle designating an existing container in a database for which applicableDB returned true. The method should return oocTrue if the container should be scanned, or oocFalse if it should be skipped.
The default method ignores the arguments and always returns true.

ooBoolean nextDB(ooHandle(ooDBObj)& dbh, ooBoolean* filterContsPtr)

The database number in the handle will be set to the next database to be scanned after the one designated by the value of the handle on entry. Returns oocTrue if the handle has been updated with another database to be scanned, or oocFalse if there are no more applicable databases. When the method returns true, it should also set *filterContsPtr to oocFalse if the whole database should be scanned or to oocTrue if only selected containers are to be scanned. The first time this is called, the handle will be null on entry and should be set to the first applicable database. This method will be called only if the scan storage object is a federation handle instead of a database handle.

The default method sets the database handle to the next database number (starting from 2) which exists and for which applicableDB returns true. The filterContsPtr is passed to applicableDB.

[Design note: it may appear redundant to have both applicableDB and nextDB, but applicableDB is needed for splitting indexed scans but may not be sufficiently efficient by itself for splitting non-indexed scans.]

ooBoolean nextCont(ooHandle(ooContObj)& conth)

The container number in the handle will be set to the next container to be scanned after the one designated by the value of the handle on entry. Returns oocTrue if the handle has been updated with another container to be scanned, or oocFalse if there are no more applicable containers in the same database. The first time this is called, the handle will have the database number but the container number will be 0 on entry and should be set to the first applicable container in that database. This method might not be called on databases for which nextDB has set *filterContsPtr to false.

The default method sets the container handle to the next container number (starting from 2) which exists and for which applicableCont returns true.

ooBoolean applicableObject(const ooLongId & objId)

Given the OID of an object, this method returns true if the OID should be passed to a query thread to apply the predicate and filter, or false if the object should be skipped. This would be used for splitting indexed scans or scalable collection scans. The default method calls applicableDB and, if necessary, applicableCont.

Note that splitter methods will be called from the query-manager thread, in the context of its own read-only transaction. Therefore, they can't share handles with the main application code and won't see any changes to persistent data made in the application's current transaction. Any sharing of data with the application code should be done with the awareness that there will be access from two asynchronous threads. A splitter instance can be used by only one iteration at a time and must not be deleted before that iteration is terminated.

 

In Objectivity for Java:

Class: com.objy.db.app.QuerySplitter

      void initScan(ooId obj, long typeNumber, ExpressionTree expr)
      boolean nextRange(ooIdPQ start, ooIdPQ end)
      void endScan()
      boolean applicableDB(ooId db)
      boolean applicableCont(ooId cont)
      ooId nextDB(ooId db)
      ooId nextCont(ooId cont)
      boolean filterContainers(ooId db)
      boolean applicableObject(ooId oid)

Task Assigners

Note: Task Assigners are supported only by Objectivity/C++ and Objectivity for Java.

A task assigner determines which query servers will execute which query tasks. That is, for each database or container to be searched, the task assigner identifies the particular query server that will perform the search.

The default task assigner returns the host on which the database to be searched resides. (For a replicated database, the task assigner will check which of the available replicas have a query server running on the host of the image file; if there is more than one, the tasks will be automatically divided among them.)

You can define a derived task-assigner class to override the default behavior. For each query task to be performed (that is, for each database or container to be searched), your custom task assigner should designate a group of one or more query-server hosts that are suitable for searching that data. A suitable query-server host is typically the network node on which the database file resides, or else a nearby network node. When multiple query-server hosts are designated for a particular search task, the task assigner will automatically choose whichever is currently the least busy in order to balance the load.

In Objectivity/C++:
Note: This interface is under evaluation and likely to change.

A task assigner is an instance of the following class:

  class ooTaskAssigner {
   public:
    virtual ooTaskServerGroup* assignTask(const ooLongId & oid);
  };

Given the starting OID of a range to be searched, the assignTask method should return an object designating one or more query-server hosts that are suitable for performing the search. If more than one host is indicated, the task will be automatically assigned to whichever query server is the least busy in order to balance the load.

The return value points to an instance of the following abstract class:

  class ooTaskServerGroup {
  public:
    static ooTaskServerGroup* create(); // make an empty group
    static ooTaskServerGroup* create(const char* hostName); // make with one host
    virtual void addHost(const char* hostName) = 0; // add host to group
    virtual bool removeHost(const char* hostName) = 0; // remove host from group
    virtual int nHosts() const = 0; // return number of hosts in the group
    virtual const char* getHostName(unsigned index) const = 0; // return name of nth host
    virtual ~ooTaskServerGroup() = 0;
 
    // Internal use only:
    virtual opqQueryAgent* assignTask(const ooLongId & oid, ocmSession* session) = 0;
  };

Instances (of an internal subclass) are created by the static create method. The addHost method can be called any number of times to add hosts to the group. The same instance can be returned from multiple calls to ooTaskAssigner::assignTask. The application program is responsible for deleting it if it is no longer needed.

To cause a user-defined task assigner to be used, it needs to be registered by passing it as the argument to the following static method:

  void ooObjy::setTaskAssigner(ooTaskAssigner* assigner)

The currently registered instance can be obtained by calling:

  ooTaskAssigner* ooObjy::getTaskAssigner()

Note that task assigner registration is global for the process, not specific to the current session.

The following method is also provided:

  ooBoolean ooObjy::checkQueryServer(const char * hostName);

This method returns true if a query server is running on the host named by the argument.

This can be used by a task assigner to avoid returning a host that doesn't actually have a query server running.

 

In Objectivity for Java:
Note: This interface is under evaluation and likely to change.
The Objectivity for Java interface does not support task-assignment groups. Instead, you implement the assignTask method to return a single host name rather than a group of host names.
      
     com.objy.db.app.TaskAssigner interface
            String assignTask(ooId oid)

      Methods in Connection class:
            TaskAssigner getTaskAssigner()
            void setTaskAssigner(TaskAssigner ta)
            boolean checkQueryServer(String hostname)

In-Process Query Server

As an alternative to running separate query server processes for a parallel query, you can start an in-process query server (IPQS) from within the querying application. An IPQS uses multiple threads to search multiple containers or databases in parallel.

You can run an IPQS instead of, or in combination with, any number of separate query server processes. By default, an IPQS searches only the containers and databases that reside locally on the same host running the application. You can use a custom task assigner if you want the IPQS to search data on remote hosts.

Using an IPQS has several advantages:

  • The IPQS can reduce or eliminate the need for separately started query-server processes.
  • The IPQS communicates its search results between threads of the same process, which is more efficient than network messaging between separate processes. Note, however, that an application using an IPQS should run on a host machine with a large number of cores.

An IPQS accepts only search tasks assigned from within the same application. If two querying applications need to search data on the same host, and one of those applications performs its search with an IPQS, you must arrange for the other application to use either an IPQS of its own or a separate query server process. (Two applications can, however, use the same query server process.)

In Objectivity/C++:
Note: This interface is under evaluation and likely to change.

The following method starts an in-process query server in an Objectivity/C++ application:

  void ooObjy::startInternalQueryServer(unsigned numThreads=0);

This method loads and initializes an internal query server with the designated number of worker threads. If the argument is 0, a default number of threads will be used, which will depend on the number of processor cores.

A custom task assigner can use a host name of NULL to indicate the use of an in-process query server.

You can pass NULL to ooObjy::checkQueryServer to test whether an in-process query server is running.

Configuring an IPQS:

By default, an IPQS creates its threads on demand, up to the specified number of threads, and each thread uses a session that does not perform session logging, and that is created with cacheInitialCapacity=200, cacheMaxCapacity=300, and largeObjectMemoryLimit=0. You can change any of these properties by creating and enabling a configuration file. See the documentation for ooObjy::enableConfiguration in Objectivity/C++ Programmer's Reference for information using configuration files.

Within a configuration file, you can add any of the following settings to configure an IPQS:

  <IPQSConfiguration
     maximumThreads="4"            // Maximum number of threads to use for running tasks/queries; 
                                   //   if present, overrides numThreads parameter of the 
                                   //   startInternalQueryServer method
     initialThreads="0"            // Initial number of threads to create - others created on demand
     disableLogging ="0"           // Turn off logging
     cacheInitialCapacity="200"    // Controls session cache
     cacheMaxCapacity="300"        // Controls session cache
     largeObjectMemoryLimit="0"/>  // Controls session cache

If numthreads is 0, and there is no enabled configuration file (or the file does not set the maximumThreads attribute), then the number of threads used by the IPQS is 2 x numProcessors, where numProcessors is the number of processors  or cores in the host machine, or 2, if the number of processors cannot be determined.

Date: 
Tuesday, February 19, 2013
Product: 
Objectivity/DB
Version: 
11.0