Processing Queue Sets

ProcessingQueueSet

typedef struct fluxEngine_C_v1_ProcessingQueueSet fluxEngine_C_v1_ProcessingQueueSet

Processing Queue Set.

This opaque data structure wraps a processing queue set. It may be created via the fluxEngine_C_v1_ProcessingQueueSet_create() function and destroyed via the fluxEngine_C_v1_destroy_processing_queue_set() function.

A processing queue set is the basic building block that allows data to be processed. Only one thread may perform data processing with the same processing queue set at the same time; locking is used to ensure that this happens.

The user is free to create as many processing queue sets for a given handle as they wish. Processing via multiple processing queue sets may happen from different threads at the same time.

ProcessingQueueSet_create

int fluxEngine_C_v1_ProcessingQueueSet_create(fluxEngine_C_v1_Handle *handle, fluxEngine_C_v1_ProcessingQueueSet **processing_queue_set, fluxEngine_C_v1_Error **error)

Create a new processing queue set.

Creates a new processing queue set for a given fluxEngine handle. This may then be used to create processing contexts.

If the call is successful, a pointer to the newly created processing queue set will be stored in processing_queue_set. The is the typical usage pattern of this method:

fluxEngine_C_v1_Handle* handle = ...;
fluxEngine_C_v1_Error* error = NULL;
fluxEngine_C_v1_ProcessingQueueSet* processing_queue_set = NULL;
int ret = fluxEngine_C_v1_init(handle, &processing_queue_set,
                               &error);
if (ret != 0) {
    // perform error handling
    // ...
    // cleanup error structure
    fluxEngine_C_v1_Error_free(error);
    // don't proceed
    return;
}
// processing_queue_set is now valid
// ...
// at the end, when the processing queue set is no longer
// needed:
fluxEngine_C_v1_destroy_processing_queue_set(processing_queue_set);
The following specific error codes may be returned by this function:

  • fluxEngine_C_v1_ErrorCode_Unknown

  • fluxEngine_C_v1_ErrorCode_AllocationFailure

  • fluxEngine_C_v1_ErrorCode_InvalidArgument

Parameters:
  • handle – The fluxEngine handle

  • processing_queue_set[out] A pointer to the resulting processing queue set, on success

  • error[out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.

Returns:

0 on success, -1 on failure

ProcessingQueueSet_create_threads

int fluxEngine_C_v1_ProcessingQueueSet_create_threads(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set, int thread_count, fluxEngine_C_v1_Error **error)

Create processing threads for a given processing queue set.

fluxEngine may use parallization to speed up processing. In order to achieve this background threads must be created to run on additional CPU cores.

Calling this function is optional: by default processing will be single-threaded.

This function will start thread_count - 1 threads when called, as the thread that asks for processing is always considered to be the first thread (with id 0). For example, if 4 is supplied to thread_count this function will start 3 threads that run in the background. The thread that the user uses to call fluxEngine_C_v1_ProcessingContext_process_next() will be considered the thread with id 0, making processing use a total of 4 threads, which is the value supplied for thread_count.

This function may only be called if there are currently no background threads associated with this processing queue set. Otherwise fluxEngine_C_v1_stop_processing_queue_set_threads() must be called first to change the number of threads.

Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.

The following specific error codes may be returned by this function:

  • fluxEngine_C_v1_ErrorCode_Unknown

  • fluxEngine_C_v1_ErrorCode_AllocationFailure

  • fluxEngine_C_v1_ErrorCode_InvalidArgument

  • fluxEngine_C_v1_ErrorCode_HandleNoLongerValid

  • fluxEngine_C_v1_ErrorCode_ThreadCreationError

Parameters:
  • processing_queue_set – The processing queue set to create the threads for

  • thread_count – The number of threads to use for parallel processing (one less than this number will be created by this function, see the description for details)

  • error[out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.

Returns:

0 on success, -1 on failure

ProcessingQueueSet_create_threads_ex

int fluxEngine_C_v1_ProcessingQueueSet_create_threads_ex(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set, int thread_count, fluxEngine_C_v1_ThreadInitFunction init_function, void *init_function_context, fluxEngine_C_v1_Error **error)

Create processing threads for a given processing queue set (extended version)

Please read the documentation of fluxEngine_C_v1_ProcessingQueueSet_create_threads() for general details.

This extended function allows the user to supply a thread initialization function that will be called at the beginning of the newly created background threads. This allows the user to customize the thread properties (such as the thread priority or the CPU affinity) themselves.

This function will only return once all thread initialization functions have run.

The thread initialization functions are only called for the backgronud threads that are started by this function; this means that for a thread_count of 4 the initialization function will be called in 3 background threads, and it is up to the user to alter the thread in which they call fluxEngine_C_v1_ProcessingContext_process_next() to process data with fluxEngine.

The threads will be created sequentially, the next thread being created only after the previous thread’s initialization function has completed. This allows the user to directly modify global data structures in the initialization functions without the need for locking.

Important: any attempt to call a function that accesses this processing queue set inside the initialization functions will create a deadlock.

This function may only be called if there are currently no background threads associated with this processing queue set. Otherwise fluxEngine_C_v1_stop_processing_queue_set_threads() must be called first to change the number of threads.

Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.

The following specific error codes may be returned by this function:

  • fluxEngine_C_v1_ErrorCode_Unknown

  • fluxEngine_C_v1_ErrorCode_AllocationFailure

  • fluxEngine_C_v1_ErrorCode_InvalidArgument

  • fluxEngine_C_v1_ErrorCode_HandleNoLongerValid

  • fluxEngine_C_v1_ErrorCode_ThreadCreationError

  • fluxEngine_C_v1_ErrorCode_ThreadInitFunctionError

Parameters:
  • processing_queue_set – The processing queue set to create the threads for

  • thread_count – The number of threads to use for parallel processing (one less than this number will be created by this function, see the description for details)

  • init_function – The initialization function to call at the start of each newly created background thread

  • init_function_context – An arbitrary context that will be passed to the initialization function

  • error[out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.

Returns:

0 on success, -1 on failure

ProcessingQueueSet_stop_threads

void fluxEngine_C_v1_ProcessingQueueSet_stop_threads(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set)

Stop background threads of a processing queue set.

This will stop any background threads that are currently associated with a given processing queue set. If processing is currently active on the handle, it will be aborted, as if fluxEngine_C_v1_ProcessingContext_abort() had been called. In that case this method may take a bit of time, as abort operations are not immediate, and this method will wait until the abort has completed.

Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.

This method is always successful: the only errors that could occur when calling this method would be non-recoverable.

If NULL is passed to this method, it will do nothing.

Parameters:
  • processing_queue_set – The processing queue set to stop the threads for

ProcessingQueueSet_free

void fluxEngine_C_v1_ProcessingQueueSet_free(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set)

Destroy a processing queue set.

Destroy a processing queue set, freeing its resources. All background threads will be stopped in the same manner as if fluxEngine_C_v1_ProcessingQueueSet_stop_threads() had been called.

Any processing context associated with this processing queue set will be marked as invalid and may hence not be used anymore. However, some memory associated with remaining processing contexts that have not been freed previous to a call to this method may still be in use until each remaining processing context is freed by the user.

All other processing queue sets of the handle will remain valid.

If NULL is passed to this method, it will do nothing.

Parameters:
  • processing_queue_set – The processing queue set to destroy