AggregationBuffer

Usage

use AggregationBuffer;

TODO: Experiment with expanding buffer sizes

config const AggregatorMaxBuffers = -1
config const AggregatorBufferSize = 64*1024
config param AggregatorDebug = false
proc UninitializedAggregator(type msgType)
record Aggregator
type msgType
var instance: unmanaged nilable AggregatorImpl(msgType)
var pid = -1
proc init(type msgType, aggregatorBufferSize: int = AggregatorBufferSize, aggregatorMaxBuffers: int = AggregatorMaxBuffers)
proc init(type msgType, instance: unmanaged nilable AggregatorImpl(msgType), pid: int)
proc init=(other)
proc destroy()
proc isInitialized()
proc _value
class Buffer

Buffer contains the aggregated data that the user aggregates. The buffer, if returned to the user, must be recycled back to the buffer pool by invoking ‘done’.

type msgType
proc readWriteThis(f)
proc done()

Recycles self back to buffer pool. Using the buffer after invoking this method is subject to undefined behavior.

proc this(idx: integral) ref

Indexes into buffer. This will be remote if the buffer is.

iter these(): msgType

Iterates over buffer. The buffer is copied to current locale, so it will be local.

iter these(param tag: iterKind): msgType

Iterates over buffer in parallel. The buffer is copied to current locale so it will be local.

iter these(param tag: iterKind): msgType
iter these(param tag: iterKind, followThis): msgType
proc getPtr()
proc getDomain()
proc getArray()
proc size
proc cap
class AggregatorImpl
type msgType
proc init(type msgType, aggregatorBufferSize: int, aggregatorMaxBuffers: int)
proc init(other, pid: int)
proc deinit()
proc dsiPrivatize(pid)
proc dsiGetPrivatizeData()
proc getPrivatizedInstance()
proc size()
proc sizeGlobal()
proc aggregate(msg: msgType, loc: locale): unmanaged nilable Buffer(msgType)
proc aggregate(msg: msgType, locid: int): unmanaged nilable Buffer(msgType)
iter flushGlobal(targetLocales = Locales): (unmanaged Buffer(msgType), locale)
iter flushGlobal(targetLocales = Locales, param tag: iterKind): (unmanaged Buffer(msgType), locale)
iter flushLocal(targetLocales = Locales): (unmanaged Buffer(msgType), locale)
iter flushLocal(targetLocales = Locales, param tag: iterKind): (unmanaged Buffer(msgType), locale)