WorkQueue

Usage

use WorkQueue;
config const workQueueMinTightSpinCount = 8
config const workQueueMaxTightSpinCount = 1024
config const workQueueMinVelocityForFlush = 1
config const workQueueMinEligibleForSteal = 1024
config const workQueueMinDifferenceForSteal = 1024*1024
config const workQueueVerbose = false
param WorkQueueUnlimitedAggregation = -1
param WorkQueueNoAggregation = 0
iter doWorkLoop(wq: WorkQueue(?workType), td: TerminationDetector, doWorkStealing = false, locales = Locales, tasks = 1..here.maxTaskPar): workType
iter doWorkLoop(wq: WorkQueue(?workType), td: TerminationDetector, doWorkStealing = false, locales = Locales, tasks = 1..here.maxTaskPar, param tag: iterKind): workType
proc <=>(ref wq1: WorkQueue, ref wq2: WorkQueue)
record DuplicateCoalescer
type t
var dupValue: t
proc init(type t, dupValue: t)
proc this(A: [?D] ?t)
record NopCoalescer
type t
proc init(type t)
proc this(arr: [?D] t)
proc UninitializedWorkQueue(type workType, coalesceFn: ?t = NopCoalescer(workType))
record WorkQueue
type workType
type colaesceFnType
var instance: unmanaged workTypeWorkQueueImplcolaesceFnType
var pid = -1
proc init(type workType, numAggregatedWork: int = WorkQueueNoAggregation, coalesceFn: ?t = new NopCoalescer(workType))
proc isInitialized()
proc _value
class WorkQueueImpl
type workType
var pid = -1
var queue = new unmanaged Bag(workType)
var destBuffer = UninitializedAggregator(workType)
var dynamicDestBuffer = UninitializedDynamicAggregator(workType)
var asyncTasks: TerminationDetector
var shutdownSignal: atomicbool
var coalesceFn
proc init(type workType, numAggregatedWork: int, coalesceFn: ? = new NopCoalescer(workType))
proc init(other, pid)
proc destroy()
proc globalSize
proc size
proc workPending
proc dsiPrivatize(pid)
proc dsiGetPrivatizeData()
proc getPrivatizedInstance()
proc shutdown()
proc isShutdown()
proc addWork(work: workType, loc: locale)
proc addWork(work: workType, locid = here.id)
proc getWork(): (bool, workType)
proc getWorkBulk(n: integral)
proc isEmpty()
proc flushLocal()
proc flush()
config const workQueueInitialBlockSize = 1024
config const workQueueMaxBlockSize = 1024*1024
class Bag
type eltType
var startIdxEnq: chpl__processorAtomicType(uint)

Helps evenly distribute and balance placement of elements in a best-effort round-robin approach. In the case where we have parallel enqueues or dequeues, they are less likely overlap with each other. Furthermore, it increases our chance to find our ‘ideal’ segment.

var startIdxDeq: chpl__processorAtomicType(uint)
var maxParallelSegmentSpace = {0..#here.maxTaskPar}
var segments: [maxParallelSegmentSpace] BagSegment(eltType)
proc nextStartIdxEnq
proc nextStartIdxDeq
proc init(type eltType)
proc deinit()
proc size
proc addBulk(elts: [?D] eltType)
proc add(elt: eltType): bool
proc removeBulk(n: integral)

Obtain work in bulk; returns an array with a size of at most ‘n’.

proc remove(): (bool, eltType)
proc main()