- Add VariableSpec class with magnitude field validation - Add Formula class supporting multiple input/output variables - Support d4rt_code as string or object with code field - Add comprehensive tests for parsing and serialization - Fix broken test import in pruebas_d4rt_test.dart Follows README.md format requirements exactly
377 lines
13 KiB
Dart
377 lines
13 KiB
Dart
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
|
||
// for details. All rights reserved. Use of this source code is governed by a
|
||
// BSD-style license that can be found in the LICENSE file.
|
||
|
||
import 'dart:async';
|
||
import 'dart:collection';
|
||
|
||
import 'package:async/async.dart';
|
||
import 'package:stack_trace/stack_trace.dart';
|
||
|
||
/// Manages an abstract pool of resources with a limit on how many may be in use
|
||
/// at once.
|
||
///
|
||
/// When a resource is needed, the user should call [request]. When the returned
|
||
/// future completes with a [PoolResource], the resource may be allocated. Once
|
||
/// the resource has been released, the user should call [PoolResource.release].
|
||
/// The pool will ensure that only a certain number of [PoolResource]s may be
|
||
/// allocated at once.
|
||
class Pool {
|
||
/// Completers for requests beyond the first [_maxAllocatedResources].
|
||
///
|
||
/// When an item is released, the next element of [_requestedResources] will
|
||
/// be completed.
|
||
final _requestedResources = Queue<Completer<PoolResource>>();
|
||
|
||
/// Callbacks that must be called before additional resources can be
|
||
/// allocated.
|
||
///
|
||
/// See [PoolResource.allowRelease].
|
||
final _onReleaseCallbacks = Queue<void Function()>();
|
||
|
||
/// Completers that will be completed once `onRelease` callbacks are done
|
||
/// running.
|
||
///
|
||
/// These are kept in a queue to ensure that the earliest request completes
|
||
/// first regardless of what order the `onRelease` callbacks complete in.
|
||
final _onReleaseCompleters = Queue<Completer<PoolResource>>();
|
||
|
||
/// The maximum number of resources that may be allocated at once.
|
||
final int _maxAllocatedResources;
|
||
|
||
/// The number of resources that are currently allocated.
|
||
int _allocatedResources = 0;
|
||
|
||
/// The timeout timer.
|
||
///
|
||
/// This timer is canceled as long as the pool is below the resource limit.
|
||
/// It's reset once the resource limit is reached and again every time an
|
||
/// resource is released or a new resource is requested. If it fires, that
|
||
/// indicates that the caller became deadlocked, likely due to files waiting
|
||
/// for additional files to be read before they could be closed.
|
||
///
|
||
/// This is `null` if this pool shouldn't time out.
|
||
RestartableTimer? _timer;
|
||
|
||
/// The amount of time to wait before timing out the pending resources.
|
||
final Duration? _timeout;
|
||
|
||
/// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
|
||
/// that have been marked releasable.
|
||
///
|
||
/// This is `null` until [close] is called.
|
||
FutureGroup? _closeGroup;
|
||
|
||
/// Whether [close] has been called.
|
||
bool get isClosed => _closeMemo.hasRun;
|
||
|
||
/// A future that completes once the pool is closed and all its outstanding
|
||
/// resources have been released.
|
||
///
|
||
/// If any [PoolResource.allowRelease] callback throws an exception after the
|
||
/// pool is closed, this completes with that exception.
|
||
Future get done => _closeMemo.future;
|
||
|
||
/// Creates a new pool with the given limit on how many resources may be
|
||
/// allocated at once.
|
||
///
|
||
/// If [timeout] is passed, then if that much time passes without any activity
|
||
/// all pending [request] futures will throw a [TimeoutException]. This is
|
||
/// intended to avoid deadlocks.
|
||
Pool(this._maxAllocatedResources, {Duration? timeout}) : _timeout = timeout {
|
||
if (_maxAllocatedResources <= 0) {
|
||
throw ArgumentError.value(_maxAllocatedResources, 'maxAllocatedResources',
|
||
'Must be greater than zero.');
|
||
}
|
||
|
||
if (timeout != null) {
|
||
// Start the timer canceled since we only want to start counting down once
|
||
// we've run out of available resources.
|
||
_timer = RestartableTimer(timeout, _onTimeout)..cancel();
|
||
}
|
||
}
|
||
|
||
/// Request a [PoolResource].
|
||
///
|
||
/// If the maximum number of resources is already allocated, this will delay
|
||
/// until one of them is released.
|
||
Future<PoolResource> request() {
|
||
if (isClosed) {
|
||
throw StateError('request() may not be called on a closed Pool.');
|
||
}
|
||
|
||
if (_allocatedResources < _maxAllocatedResources) {
|
||
_allocatedResources++;
|
||
return Future.value(PoolResource._(this));
|
||
} else if (_onReleaseCallbacks.isNotEmpty) {
|
||
return _runOnRelease(_onReleaseCallbacks.removeFirst());
|
||
} else {
|
||
var completer = Completer<PoolResource>();
|
||
_requestedResources.add(completer);
|
||
_resetTimer();
|
||
return completer.future;
|
||
}
|
||
}
|
||
|
||
/// Requests a resource for the duration of [callback], which may return a
|
||
/// Future.
|
||
///
|
||
/// The return value of [callback] is piped to the returned Future.
|
||
Future<T> withResource<T>(FutureOr<T> Function() callback) async {
|
||
if (isClosed) {
|
||
throw StateError('withResource() may not be called on a closed Pool.');
|
||
}
|
||
|
||
var resource = await request();
|
||
try {
|
||
return await callback();
|
||
} finally {
|
||
resource.release();
|
||
}
|
||
}
|
||
|
||
/// Returns a [Stream] containing the result of [action] applied to each
|
||
/// element of [elements].
|
||
///
|
||
/// While [action] is invoked on each element of [elements] in order,
|
||
/// it's possible the return [Stream] may have items out-of-order – especially
|
||
/// if the completion time of [action] varies.
|
||
///
|
||
/// If [action] throws an error the source item along with the error object
|
||
/// and [StackTrace] are passed to [onError], if it is provided. If [onError]
|
||
/// returns `true`, the error is added to the returned [Stream], otherwise
|
||
/// it is ignored.
|
||
///
|
||
/// Errors thrown from iterating [elements] will not be passed to
|
||
/// [onError]. They will always be added to the returned stream as an error.
|
||
///
|
||
/// Note: all of the resources of the this [Pool] will be used when the
|
||
/// returned [Stream] is listened to until it is completed or canceled.
|
||
///
|
||
/// Note: if this [Pool] is closed before the returned [Stream] is listened
|
||
/// to, a [StateError] is thrown.
|
||
Stream<T> forEach<S, T>(
|
||
Iterable<S> elements, FutureOr<T> Function(S source) action,
|
||
{bool Function(S item, Object error, StackTrace stack)? onError}) {
|
||
onError ??= (item, e, s) => true;
|
||
|
||
var cancelPending = false;
|
||
|
||
Completer? resumeCompleter;
|
||
late StreamController<T> controller;
|
||
|
||
late Iterator<S> iterator;
|
||
|
||
Future<void> run(int _) async {
|
||
while (iterator.moveNext()) {
|
||
// caching `current` is necessary because there are async breaks
|
||
// in this code and `iterator` is shared across many workers
|
||
final current = iterator.current;
|
||
|
||
_resetTimer();
|
||
|
||
if (resumeCompleter != null) {
|
||
await resumeCompleter!.future;
|
||
}
|
||
|
||
if (cancelPending) {
|
||
break;
|
||
}
|
||
|
||
T value;
|
||
try {
|
||
value = await action(current);
|
||
} catch (e, stack) {
|
||
if (onError!(current, e, stack)) {
|
||
controller.addError(e, stack);
|
||
}
|
||
continue;
|
||
}
|
||
controller.add(value);
|
||
}
|
||
}
|
||
|
||
Future<void>? doneFuture;
|
||
|
||
void onListen() {
|
||
iterator = elements.iterator;
|
||
|
||
assert(doneFuture == null);
|
||
var futures = Iterable<Future<void>>.generate(
|
||
_maxAllocatedResources, (i) => withResource(() => run(i)));
|
||
doneFuture = Future.wait(futures, eagerError: true)
|
||
.then<void>((_) {})
|
||
.catchError(controller.addError);
|
||
|
||
doneFuture!.whenComplete(controller.close);
|
||
}
|
||
|
||
controller = StreamController<T>(
|
||
sync: true,
|
||
onListen: onListen,
|
||
onCancel: () async {
|
||
assert(!cancelPending);
|
||
cancelPending = true;
|
||
await doneFuture;
|
||
},
|
||
onPause: () {
|
||
assert(resumeCompleter == null);
|
||
resumeCompleter = Completer<void>();
|
||
},
|
||
onResume: () {
|
||
assert(resumeCompleter != null);
|
||
resumeCompleter!.complete();
|
||
resumeCompleter = null;
|
||
},
|
||
);
|
||
|
||
return controller.stream;
|
||
}
|
||
|
||
/// Closes the pool so that no more resources are requested.
|
||
///
|
||
/// Existing resource requests remain unchanged.
|
||
///
|
||
/// Any resources that are marked as releasable using
|
||
/// [PoolResource.allowRelease] are released immediately. Once all resources
|
||
/// have been released and any `onRelease` callbacks have completed, the
|
||
/// returned future completes successfully. If any `onRelease` callback throws
|
||
/// an error, the returned future completes with that error.
|
||
///
|
||
/// This may be called more than once; it returns the same [Future] each time.
|
||
Future close() => _closeMemo.runOnce(() {
|
||
if (_closeGroup != null) return _closeGroup!.future;
|
||
|
||
_resetTimer();
|
||
|
||
_closeGroup = FutureGroup();
|
||
for (var callback in _onReleaseCallbacks) {
|
||
_closeGroup!.add(Future.sync(callback));
|
||
}
|
||
|
||
_allocatedResources -= _onReleaseCallbacks.length;
|
||
_onReleaseCallbacks.clear();
|
||
|
||
if (_allocatedResources == 0) _closeGroup!.close();
|
||
return _closeGroup!.future;
|
||
});
|
||
final _closeMemo = AsyncMemoizer();
|
||
|
||
/// If there are any pending requests, this will fire the oldest one.
|
||
void _onResourceReleased() {
|
||
_resetTimer();
|
||
|
||
if (_requestedResources.isNotEmpty) {
|
||
var pending = _requestedResources.removeFirst();
|
||
pending.complete(PoolResource._(this));
|
||
} else {
|
||
_allocatedResources--;
|
||
if (isClosed && _allocatedResources == 0) _closeGroup!.close();
|
||
}
|
||
}
|
||
|
||
/// If there are any pending requests, this will fire the oldest one after
|
||
/// running [onRelease].
|
||
void _onResourceReleaseAllowed(Function() onRelease) {
|
||
_resetTimer();
|
||
|
||
if (_requestedResources.isNotEmpty) {
|
||
var pending = _requestedResources.removeFirst();
|
||
pending.complete(_runOnRelease(onRelease));
|
||
} else if (isClosed) {
|
||
_closeGroup!.add(Future.sync(onRelease));
|
||
_allocatedResources--;
|
||
if (_allocatedResources == 0) _closeGroup!.close();
|
||
} else {
|
||
var zone = Zone.current;
|
||
var registered = zone.registerCallback(onRelease);
|
||
_onReleaseCallbacks.add(() => zone.run(registered));
|
||
}
|
||
}
|
||
|
||
/// Runs [onRelease] and returns a Future that completes to a resource once an
|
||
/// [onRelease] callback completes.
|
||
///
|
||
/// Futures returned by [_runOnRelease] always complete in the order they were
|
||
/// created, even if earlier [onRelease] callbacks take longer to run.
|
||
Future<PoolResource> _runOnRelease(Function() onRelease) {
|
||
Future.sync(onRelease).then((value) {
|
||
_onReleaseCompleters.removeFirst().complete(PoolResource._(this));
|
||
}).catchError((Object error, StackTrace stackTrace) {
|
||
_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
|
||
});
|
||
|
||
var completer = Completer<PoolResource>.sync();
|
||
_onReleaseCompleters.add(completer);
|
||
return completer.future;
|
||
}
|
||
|
||
/// A resource has been requested, allocated, or released.
|
||
void _resetTimer() {
|
||
if (_timer == null) return;
|
||
|
||
if (_requestedResources.isEmpty) {
|
||
_timer!.cancel();
|
||
} else {
|
||
_timer!.reset();
|
||
}
|
||
}
|
||
|
||
/// Handles [_timer] timing out by causing all pending resource completers to
|
||
/// emit exceptions.
|
||
void _onTimeout() {
|
||
for (var completer in _requestedResources) {
|
||
completer.completeError(
|
||
TimeoutException(
|
||
'Pool deadlock: all resources have been '
|
||
'allocated for too long.',
|
||
_timeout),
|
||
Chain.current());
|
||
}
|
||
_requestedResources.clear();
|
||
_timer = null;
|
||
}
|
||
}
|
||
|
||
/// A member of a [Pool].
|
||
///
|
||
/// A [PoolResource] is a token that indicates that a resource is allocated.
|
||
/// When the associated resource is released, the user should call [release].
|
||
class PoolResource {
|
||
final Pool _pool;
|
||
|
||
/// Whether `this` has been released yet.
|
||
bool _released = false;
|
||
|
||
PoolResource._(this._pool);
|
||
|
||
/// Tells the parent [Pool] that the resource associated with this resource is
|
||
/// no longer allocated, and that a new [PoolResource] may be allocated.
|
||
void release() {
|
||
if (_released) {
|
||
throw StateError('A PoolResource may only be released once.');
|
||
}
|
||
_released = true;
|
||
_pool._onResourceReleased();
|
||
}
|
||
|
||
/// Tells the parent [Pool] that the resource associated with this resource is
|
||
/// no longer necessary, but should remain allocated until more resources are
|
||
/// needed.
|
||
///
|
||
/// When [Pool.request] is called and there are no remaining available
|
||
/// resources, the [onRelease] callback is called. It should free the
|
||
/// resource, and it may return a Future or `null`. Once that completes, the
|
||
/// [Pool.request] call will complete to a new [PoolResource].
|
||
///
|
||
/// This is useful when a resource's main function is complete, but it may
|
||
/// produce additional information later on. For example, an isolate's task
|
||
/// may be complete, but it could still emit asynchronous errors.
|
||
void allowRelease(Function() onRelease) {
|
||
if (_released) {
|
||
throw StateError('A PoolResource may only be released once.');
|
||
}
|
||
_released = true;
|
||
_pool._onResourceReleaseAllowed(onRelease);
|
||
}
|
||
}
|