The Compute API is a library for working with DCP, the Distributed Compute Protocol, to perform arbitrary computations on the Distributed Computer.
This is a terse API focused initially on the Scientific Computing audience, which exposes DCP capabilities in a way that allows developers to easily build applications for DCP which are forward-compatible with future versions of the Compute API and changes/optimiziations/etc to the underlying DCP.
Date | Author | Change |
---|---|---|
Nov 09 2018 | Wes Garland | Initial Release, tracking spec Oct 31 2018 |
General Public, unrestricted
This API focuses on generators, both ad-hoc and from published applications, built around some kind of iteration over a common function, and events. It is implemented in the compute.js module of the dcp package.
The compute module is the holding the module for classes and configuration options (especially default options) related to this API.
Most computations on the Distributed Computer operate by mapping an input set to an output set by applying a function to each element in the input set. Input sets can be arbitrary collections of data, but are frequently easily-described number ranges or distributions.
Generators associate functions with input sets, and enable their mapping and distribution on the Distributed Computer. We can create ad-hoc generators with the compute.run
and compute.for
APIs, or we can create generators using functions defined in applications via the Application.prototype.run
and Application.prototype.for
methods.
This function returns a generator handle (an object which corresponds to a generator), and accepts one or two arguments, depending on form.
form 1: compute.run(work)
This form returns a generator handle and accepts one argument, work. Executing this generator will cause work to run a single task on the Distributed Computer. This interface is in place primarily to enable DCP-related testing during software development. When it is executed, the returned promise will be resolved with the value returned by work.
form 2: compute.run(n, work)
This form returns a generator which, when executed, causes work to run n times and resolve the returned promise with an array of values returned by work in no particular order. For each invocation of the work function, work will receive as its sole argument a unique number which is greater than or equal to zero and less than n.
note - When work is a function, it is turned into a string with Function.prototype.toString
before being transmitted to the scheduler. This means that work cannot close over local variables, as these local variables will not be defined in the miner’s worker thread. When work is a string, it is evaluated in the worker thread, and is expected to evaluate to a single function.
This function returns an object which corresponds to a generator, and accepts two or more arguments, depending on form. The final argument, work, is scheduled for execution with one slice of the input set, for each element in the set. It is expected that work could be executed multiple times in the same worker thread, so care should be taken not to write functions which depend on uninitialized global state and so on.
Every form of this function returns a generator which, when executed, causes work to run n times and resolve the returned promise with an array of values returned by work, indexed by slice number (position within the set).
When the input set is composed of unique primitive values, the array which resolves the promise will also have an own property entries
method which returns an array, indexed by slice number, which contains a {key: value} object, where key is in the input to work, and value is the return value of work for that input. This array will be compatible with functions accepting the output of Object.entries() as their input.
The for
method executes a function, work, in the worker by iterating over an n-dimensional series of values. Each iteration is run as a separate slice, and each receives a single (n-dimensional) value. This is an overloaded function, accepting iteration information in a variety of ways. When work returns, the return value is treated as result, which is eventually used as part of the array or object which resolves the returned promise.
note - When work is a function, it is turned into a string with Function.prototype.toString
before being transmitted to the scheduler. This means that work cannot close over local variables, as these local variables will not be defined in the miner’s worker thread. When work is a string, it is evaluated in the worker thread, and is expected to evaluate to a single function.
form 1: for (rangeObject, work)
This form accepts a range object rangeObject, (see below) and this range object is used as part of the generator on the scheduler. What this means is that this form is very efficient, particularly for large ranges, as the iteration through the set happens on the scheduler, and one item at a time. When the range has { start:0, step:1 }
, the returned promise is resolved with an array of resultant values. Otherwise, the returned promise is resolved with an object whose keys are the values in the range.
form 2a: for (start, end, step, work)
- start, end, and step are numbers used to create a range object. Otherwise, this is the same as form 1.
form 2b: for (start, end, work)
- exactly the same as form 2a, except step is always 1.
form 3: for ([rangeObject, rangeObject...], work)
Similar to form1, except we are dealing with an array of 0 or more range objects. These are used to create multi-dimensional ranges, like nested loops. If they were written as traditional loops, the outermost loop would be the leftmost range object, and the innermost loop would be the rightmost range object.
The promise is resolved following the same rules as in form 1, except the arrays/objects nest with each range object. (See examples for more clarity)
for (iterableObject, work)
Every form of the for()
function maps an input set onto an output set by means of a work function.
The promise is resolved with an Array-like object which represents this mapping, indexed by slice number. Additional, non-enumerable methods will be available on this object to make marrying the two sets together more straightforward. These methods are are based on methods of Object
.
[input, output]
pairs, in the same order as that the data appear in the input set, where work(input
) yields output
. If the input to work to had more than argument, input
will be an Array that is equivalent to the arguments vector that work was called with.entries()
's input
with output
, for all elements in the input set where input
is an ES5 primitive type. In the case where there are key collisions, the key closest to the end of the input set will be used. Equivalent to Object.fromEntries(results.entries())
(see https://tc39.github.io/proposal-object-from-entries/).results.keys[n]
), taking steps to avoid re-generating the entire input set if not necessary.Range objects are vanilla ES objects used to describe value range sets for use by compute.for()
. Calculations made to derive the set of numbers in a range are carried out with BigNumber, eg. arbitrary-precision, support. The numbers Infinity and -Infinity are not supported, and we do not differentiate between +0 and -0.
Describing value range sets, rather than simply enumerating ranges, is important because we need to be able to eventually schedule very large sets without the overhead of transmitting them to the scheduler, storing them, and so on.
Range Objects are plain JavaScript objects with the following properties:
start
: The lowest number in the rangeend
: The highest number in the rangestep
: The increment used between iterations, to get from start to end.group
: The number of consecutive elements in the range which will be processed by each slice. If this value is specified, even it is specified as 1, the function called in the worker thread (eg. work) will receive as its argument an array of elements in the set as its input. When this value is not specified, the function called in the worker thread will receive a single datum as its input.When end - start
is not an exact multiple of step
, the generator will behave as though end
were the nearest number in the range which is an even multiple of step, offset by start. For example, the highest number generated in the range object {start: 0, end: 1000, step: 3}
would be 999.
Distribution objects are used with compute.for
, much like range objects. They are created by methods of the set exports of the stats module, and are used to describes input sets which follow common distributions used in the field of statistics. The following methods are exported:
normal(n, x̄, σ) – generates a set of numbers arranged in a normal distribution, where
– n represents the size of the set
– x̄ represents the mean of the distribution
– σ represents the standard deviation of the distribution
binomial(n, P) – generates a set of numbers arranged in a binomial distribution, where
– n represents the size of the set
– p represents the probability of success on an individual trial
poisson(n, λ, x̄) – generates a set of Poisson-distributed random numbers, where
– n represents the size of the set
– λ represents the rate of occurrence
– x̄ represents the mean of the distribution. If unspecified, the distribution will have the range [0,1].
random(n, min, max) – generates a set of randomly-distributed numbers, where
– n represents the size of the set
– min represents the smallest number in the set
– max respresents the smallest number which is greater than min but not in the set
let stats = require('stats')
let g = compute.for(stats.set.poisson(10, 0.1, 100), (i) => i)
Generator handles are objects which correspond to generators. They are created by some exports of the compute module, such as compute.run
and compute.for
.
require.path
in the worker before work is evaluated in the main module.module.path
in the worker thread’s main module before work is evaluated in the main module.total
and complete
. This object is automatically updated while the generator is deployed, as results arrive at the client. Note - total
will be undefined until the generator has been deployed.compute.default.scheduler
; if that is undefined, we will use dcpConfig.scheduler
.estimate()
method when exec()
was last called for this generator.cancel - Takes as its sole argument an id string and tells the scheduler to cancel a deployed generator. This method returns a promise which is resolved once the scheduler acknowledges the cancellation and has transitioned to a state where no further costs will be incurred as a result of this generator.
resume - Takes as its sole argument an id string which was previously returned from exec
, and returns a generator handle with allow the same properties and methods as the object that was originally passed to exec
. Note that compose.resume
does not reestablish client-side event handlers.
exec - tells the scheduler to deploy the generator. This method accepts arguments costProfile
and paymentWallet
, which, when not undefined, update the same-named properties of the generator handle. Invoking this method a second (or nth) time returns a new promise and transmits a new cost profile to the scheduler; the payment wallet cannot be changed after the generator has been deployed.
This method returns a promise which is resolved once the client has received results for every slice. When this method is called for a generator which is already deployed, any changes to the cost profile will be transmitted to the scheduler, and the scheduler will apply them on a best-effort basis to tasks generated after the change request. Note that a new cost profile will generally cause the scheduler to alter the quantity of DCC currently in escrow for that generator. There is no guarantee that changes to the cost profile will occur instantaneously.
This method can reject the promise with errors from the scheduler. Any of the errors below imply that the scheduler has paused the generator:
ENOFUNDS - insufficient credit in the account associated with the payment wallet. This error can happen immediately upon deployment, or partway through the generator if the scheduler discovers that slices/tasks are using more compute than estimated and it needs to re-escrow. Calling exec
on this generator handle after an ENOFUNDS error will cause the scheduler to attempt to resume the generator as described above.
If the promise is rejected with the errors below, the scheduler has cancelled the generator:
ENOPROGRESS - the scheduler has determined that too many worker threads are not receiving regular progress update messages, are receiving invalid progress messages, or that slices are completing without emitting at least one progress update message.
ETOOMANYSLICES - the scheduler has determined that this generator exceeds the maximum number of allowable slices per generator
ENOMEM - the scheduler has determined that this generator exceeds the maximum allowable worker memory footprint
EWORKTOOBIG - the scheduler has determined that the combined size of the work function and local modules associated with this generator exceeds the maximum allowable size
ETOOBIG - the scheduler has determined that this generator exceeds the maximum allowable amount of work
ETOOSLOW - the scheduler has determined that this generator exceeds the maximum allowable execution time on the reference core
ESLICETOOSLOW - the scheduler has determined that individual slices in this generator exceed the maximum allowable execution time on the reference core
ETOOMANYERRORS - the scheduler has determined that too many worker threads are terminating with uncaught exceptions for this generator
Any other rejections should be treated as transient errors, and client developers should assume that the results could be retrieved by eventually calling await compute.resume(g.id).exec()
. These transient errors include, but are not limited to:
EPERM - client tried to do something prohibited, such as updating the payment wallet of a generator.
localExec: This function is identical to exec, except that the generator is executed locally, in the client. It accepts one argument, cores
. If cores is false, the execution will happen in the same JavaScript context as the client. Otherwise, cores is interpreted as the number of local cores in which to execute the generator. Local modules will be loaded using regular filesystem semantics; modules which originate in packages will be loaded from the module server associated with the current scheduler.
requires: This function specifies a module dependency (when the argument is a string) or a list of dependencies (when the argument is an array) of the work function. This function can be invoked multiple times before deployment. See the Modules and Packages section for more information.
estimate: This function returns a promise which is resolved with an object describing the cost to run the entire generator. This object has the following properties:
coreHours – the estimated number of core-hours required to run the generator to completion
marketCoreHourValue – the current value of one core-hour (on the current scheduler) in DCC for that generator (market value can change based on requirements object).
marketRate – the estimated amount of DCC required to run the generator to completion at the current compute market price. Note: if the generator has already been deployed, this value is only for the remaining tasks.
status – true if the scheduler will allow the generator to run, without regard to cost profile, otherwise it is set to the error that would be returned from exec
if the generator were deployed (eg. ETOOMANYSLICESS, ENOMEM, ETOOBIG, etc). The status ENOPROGRESS will be returned if the work function does not emit at least one progress update message, regardless of the actual execution time of said function.
estimateSlice: This function operates exactly like estimate
, but only estimates the cost the first slice.
setPrice: Set the cost profile. This is equivalent to the first argument to exec
.
setPaymentWallet: Set the payment wallet. This is equivalent to the second argument to exec
. Setting the payment wallet after the initial call to exec
will throw EPERM in the setter.
The generator handle is an EventEmitter (see EventEmitters, below), with the following events defined:
exec()
has been used to apply a new cost profile; this means that the scheduler has acknowledged that subsequently-dispatched tasks will be costed with the new profile. The argument to the event handler is a BigNum which is the estimated cost per slice in DCC.preventDefault
and a properties result
and sliceNumber
. Result is the return value of the work function. sliceNumber identifies the position within the output set where the result belongs. If the method preventDefault is called, the result will not be added to the output set.sliceNumber
property which can be used to determine which element in the input set the worker thread was working on when the event message was emitted.Work functions (i.e. the final argument to compute.for()
are generally executed in worker threads inside miners. These are the functions which map the input set to the output set.
Each work function receives as its input one element in the input set. Multi-dimensional elements, such as those defined in compute.for() form 3
, will be passed as multiple arguments to the function. The function returns the corresponding output set element, and must emit progress events.
The execution environment is based on CommonJS, providing access to the familiar require()
function, user-defined modules, and modules in packages deployed on Distributed Compute Labs’ module server.
progress(n) – a function that returns true
and which accepts as its sole argument a number between 0 and 1 (inclusive) that represents a best-guess at the completed portion of the slice as a ratio of completed work to total work. If the argument is a string ending in the %
symbol, it will be interpreted as a number in the usual mathematical sense.
This function emits a progress event. Progress events should be emitted approximately once per second; a slice which fails to emit a progress event for 30 seconds will have its task cancelled by the supervisor. The argument to this function is interpreted to six significant digits, and must increase for every call.
All work functions must emit at least one progress event - this requirement will be enforced by the estimator.
console.log() – a function which emits console
events on the generator handle in the client with properties level, message, fileName, lineNumber. If the worker thread is running in an environment with a native console
object, the native method may also be invoked. The properties in the client will be as follows:
level
- the string “log”
message
- the arguments to the function, coerced to string, separated by ", "
(comma space).
fileName
- the name of the file the function was invoked from, or undefined
lineNumber
- the line number in the file where the function was invoked from, or undefined
In the case where the most recent message is identical to the message we are about to emit, the message is not emitted, but rather a “same” counter is incremented. Eventually, we will emit a console
event on the generator handle with the sole property same, having the value of the “same” counter. This will happen when either a new, different, message is logged, the worker terminates, or a progress update event is emitted; whichever comes first.
console.debug() – exactly the same as console.log()
, except with level = “debug”
console .info() – exactly the same as console.log()
, except with level = “info”
console.warn() – exactly the same as console.log()
, except with level = “warn”
console.error() – exactly the same as console.log()
, except with level = “error”. note - some ES environments (Chrome, Firefox) implement C-style print formatting in this method. We may add that in the future.
worker.emit() – emit an arbitrary event, which will be fired by the worker object on the generator handle in the client. The first argument to this function is the event name; the second argument becomes the value passed to the event handler in the client. Note - only values which can be represented in JSON are supported.
Applications are essentially groups of work functions that live on the scheduler which, when combined with an input set, produce generators. Running generators derived from applications also unlocks extra functionality within the miners, such as the ability to have categories (‘hashTags’) which allow miners to choose which types of work loads they are interested in mining. Each application has one or more functions, which are referenced symbolically instead of being explicitly specified in the generator.
Launching an application on the network is a two-step process. The developer creates and names the application and its functions and submits it to the Distributed Computer. Once the application has been reviewed by Distributed Compute Labs’ staff, it will be available for clients to create generators. Applications can only be updated by publishing new versions, and only submission requests signed by the original submitter’s key will be accepted.
Application handles are objects which correspond to applications. They are created by instantiating the compute.Application
class.
The Application constructor is an overloaded object which is used for defining/publishing applications and referencing applications on the scheduler. An instance of Application which is used for define/publish can also be used to reference that application, but that will generally not happen in the real world, as the two events will be disconnected by significant amounts of time – assuming the application is even approved in the first place.
This form of the constructor is used for creating/publishing applications. It accepts three arguments: applicationName, version, and publishWallet.
This form of the constructor is used to access functions which have already been published. It accepts two arguments: applicationName, and version.
"^1.0.0"
and "#versionIdHexString"
forms.requirePath
or modulePath
properties on this object will be stored during publish() and will be reflected automatically when the function is used in a generator.require.path
in the worker before requirePath is adjusted for the function (see requirePath in the Generator Handles section)compute.run
, except instead of a function work,we specify the name of the function within the application. Returns a generator handle.compute.for
, except instead of a function work, we specify the name of the function within the application. Returns a generator handle.Cost profiles are used to describe the fee structure that the user is willing to pay for a specific generator’s output. We define three fixed value profiles for use by DCP users; other profiles can be specified as cost profile objects. The fixed value profiles are
0
- request free computationcompute.marketValue
- request computation of the entire generator at market value (as of the time when the generator is executed), with no upper limit whatsoever.compute.safeMarketValue
- request computation at market value, with an upper limit per slice and/or per generator as described in dcp-config
.Possible Future Growth - it might be interesting to att compute.dynamicMarketValue
at some point. This would cause the scheduler to re-compute the market value for that generator every few slices.
Cost profile objects have the following properties:
maxTotalPayment
divided by the size of the input set.maxPaymentPerSlice
multiplied by the size of the input set.Using a cost profile object with both maxPaymentPerSlice
and maxTotalPayment
undefined is an error. Interfaces can treat this condition the same as ENOFUNDS, since we know there are no bank accounts with infinite funds.
Any interface which accepts a cost profile object (e.g. exec()
) must also handle literal numbers, instances of Number, and BigNums. When a number is used, it is equivalent to an object which specifies maxTotalPayment. i.e., .exec(123)
is the same as .exec({maxTotalPayment: 123})
Requirements objects are used to inform the scheduler about specific execution requirements, which are in turn used as part of the capabilities exchange portion of the scheduler-to-miner interaction.
This object will grow over time, and needs to be organized in a way that we never need to move properties; this means that good top-level namespacing will be critical.
let requirements = {
machine: {
gpu: true
},
engine: {
es7: true,
spidermonkey: true
}
}
Boolean requirements are interpreted as such:
In the example above, only workers with a GPU, running ES7 on SpiderMonkey would match. In the example below, any worker which can interpret ES7 but is not SpiderMonkey will match:
let requirements = {
engine: {
es7: true,
spidermonkey: false
}
}
All EventEmitters defined in this API will be bound (i.e. have this
set) to the relevant generator when the event handler is invoked, unless the event handler has previously been bound to something else with bind or an arrow function.
The EventEmitters will have the following methods:
(eventName, listener)
: execute the function listener when the event eventName is triggered.(eventName, listener)
: same as on(eventName, listener)
: same as on(eventName, listener)
: remove an event listener which has been attached with on or one of its synonyms.(eventName, callback)
: same as removeListenerThe compute module will export an object named default
, which is used to specify default values. This object is always exported, even if it has no properties. The following properties are supported:
hostname
and port
. This is the same format as the properties in the scheduler
object found in dcpConfig
. If unspecified, we will fall back to the values specified in dcpConfig
.The work specified by the generator handle exec
and application handle publish
methods can depend on modules being available in the worker thread. This will be handled by automatically publishing all of the modules which are listed as relative dependencies of the generator. We can assume that dependencies loaded from the require.path are part of pre-published packages.
Unlocking keystores is handled by the Protocol API, in protocol.unlock(keystore, passphrase)
. When the passphrase is undefined or incorrect, the protocol will solicit a passphrase from the user. In the browser, this is done via the keychain API; on Node applications, this is done by prompting for a password from the console in a blocking read with stty echo off. If the Node application detects that the console is not a terminal (see isatty(3)
), it will throw an exception instead of prompting for a password.
const compute = new require('dcp/compute')
const protocol = new require('dcp/protocol')
const paymentWallet = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for(1, 3, function (i) {
progress('100%')
return i*10
})
g.setPrice(compute.safeMarketPrice)
g.setPaymentWallet(paymentWallet)
let results = await g.exec()
console.log('results: ', results)
console.log('entries: ', results.entries())
console.log('fromEntries:', results.fromEntries())
console.log('keys: ', results.keys())
console.log('values: ', results.values())
console.log('key(2): ', results.key(2))
Output:
results: [ 10, 20, 30 ]
entries: [ [ '1', 10 ], [ '2', 20 ], [ '3', 30 ] ]
fromEntries: { '1': 10, '2': 20, '3': 30 }
keys: [ '1', '2', '3' ]
values: [ 10, 20, 30 ]
key(2): 20
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for({start: 10, end: 13, step: 2}, (i) => progress(1) && i))
let results = await g.exec(compute.marketValue, paymentAccount)
console.log(results)
Output: [ 10, 12 ]
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for({start: 10, end: 13, group: 2}, (iA) => progress(1) && i[1]-i[0]))
let results = await g.exec(compute.marketValue, paymentAccount)
console.log(results)
Output: [ 1, 1 ]
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for([{start: 1, end: 2}, {start: 3, end: 5}], (i,j) => (progress(1), i*j))
let results = await g.exec(compute.marketValue, paymentAccount)
console.log(results)
Output: [[3, 4, 5], [6, 8, 10]]
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for([{start: 1, end: 2000}, {start: 3, end: 5}], function(i,j) {
let best=Infinity;
for (let x=0; x < i; x++) {
best = Math.min(best, require('./refiner').refine(x, j))
progress(x/i)
}
return best
})
let results = await g.exec(compute.marketValue, paymentAccount)
console.log(results)
Output: [[[1, 3, 3], [1, 4, 4], [1, 5, 5]], [[2, 3, 6], [2, 4, 8], [2, 5, 10]]]
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let g = compute.for([123,456], function(i) {
progress(1)
return i
})
let results = await g.exec(compute.marketValue, paymentAccount)
console.log(results)
Output: [ 123, 456 ]
const paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
let results = await compute
.for([123,456], (i) => progress(1) && i/10))
.exec(0, paymentAccount)
console.log(results)
Output: [ 12.3, 45.6 ]
function* fruitList() {
yield "banana"
yield "orange"
yield "apple"
}
let g = compute.for(fruitList(), (fruit) => progress(1) && fruit + 's are yummy!')
g.requirements = { compute: { gpu: true } }
g.paymentAccount = protocol.unlock(fs.openFileSync('myKey.keystore'))
results = await g.exec()
console.log(results.join('\n'))
Output:
bananas are yummy!
oranges are yummy!
apples are yummy!
const fs = require('fs')
const process = require('process')
let g = compute.for(0, 1000, (i) => progress(1) && i)
g.on("accepted", function (ev) {
fs.writeFileSync('gId.txt', this.id, 'ascii')
process.exit(0)
})
results = await g.exec(compute.marketValue, protocol.unlock(fs.openFileSync('myKey.keystore')))
console.log(results.reduce((a, val) => a + val)) /* probably not reached */
(new program - gets run later)
let gId = fs.readFileSync('gId.txt', 'ascii')
let g = new compute.resume(gId)
results = await g.exec()
console.log(results.reduce((a, val) => a + val))
Output: 499500
let g = compute.for(2, 1000, (i) => i)
setInterval(function() {
g.exec(compute.safeMarketValue) /* update bid price every five minutes */
}, 5 * 60000)
results = await g.exec(compute.safeMarketValue, protocol.unlock(fs.openFileSync('myKey.keystore')))
console.log(results.reduce((a, val) => a + val)) /* probably not reached */
Output: ``499499``
let app = new compute.Application("videoProcessor", "1.0.0", identificationWallet)
app.requirePath.push("core")
app.defineFunction("enhance", ["./ffmpeg", "core/serializer"], enhanceFunction)
app.defineFunction("vingette", ["./ffmpeg", "core/serializer"], vingetteFunction)
let stabilize = app.defineFunction("stabilize", ["./ffmpeg", "core/serializer"], stabilizeFunction)
stabilize.requires({machine: {gpu: true})
let appRequestId = await app.publish()
let vp = new compute.Application("videoProcessor", "^1.0.0")
let g = vp.for(frames, "stabilize")
let results = await g.exec(compute.marketPrice, paymentWallet)