Skip to content

Commit edd317d

Browse files
committed
csp event loop, async bridges
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent ef9ff93 commit edd317d

33 files changed

Lines changed: 9806 additions & 39 deletions

conda/dev-environment-unix.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies:
1414
- deprecated
1515
- docutils<0.22.1
1616
- exprtk
17+
- fastapi
1718
- flex
1819
- graphviz
1920
- gtest
@@ -57,6 +58,7 @@ dependencies:
5758
- twine
5859
- typing-extensions
5960
- unzip
61+
- uvicorn
6062
- wheel
6163
- zip
6264
- zlib

conda/dev-environment-win.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies:
1414
- deprecated
1515
- docutils<0.22.1
1616
- exprtk
17+
- fastapi
1718
# - flex # not available on windows
1819
- graphviz
1920
- gtest
@@ -57,6 +58,7 @@ dependencies:
5758
- twine
5859
- typing-extensions
5960
# - unzip # not available on windows
61+
- uvicorn
6062
- wheel
6163
# - zip # not available on windows
6264
- zlib

cpp/csp/engine/RootEngine.cpp

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ RootEngine::RootEngine( const Dictionary & settings ) : Engine( m_cycleStepTable
7575
m_cycleCount( 0 ),
7676
m_settings( settings ),
7777
m_inRealtime( false ),
78+
m_stepping( false ),
7879
m_initSignalCount( g_SIGNAL_COUNT ),
7980
m_pushEventQueue( m_settings.queueWaitTime > TimeDelta::ZERO() )
8081
{
@@ -223,7 +224,7 @@ void RootEngine::runRealtime( DateTime end )
223224

224225
for( auto * group : dirtyGroups )
225226
group -> state = PushGroup::NONE;
226-
227+
227228
dirtyGroups.clear();
228229
haveEvents = false;
229230
}
@@ -290,6 +291,144 @@ void RootEngine::shutdown( std::exception_ptr except_ptr )
290291
m_exception_ptr = except_ptr;
291292
}
292293

294+
void RootEngine::startStepping( DateTime start, DateTime end )
295+
{
296+
if( m_stepping )
297+
CSP_THROW( RuntimeException, "Engine is already in stepping mode" );
298+
299+
preRun( start, end );
300+
301+
m_exception_mutex.lock();
302+
if( m_state != State::SHUTDOWN )
303+
m_state = State::RUNNING;
304+
m_exception_mutex.unlock();
305+
306+
m_stepping = true;
307+
m_inRealtime = m_settings.realtime;
308+
m_stepDirtyGroups.clear();
309+
}
310+
311+
bool RootEngine::step( TimeDelta maxWait )
312+
{
313+
if( !m_stepping )
314+
CSP_THROW( RuntimeException, "Engine is not in stepping mode. Call startStepping() first." );
315+
316+
if( m_state != State::RUNNING || interrupted() )
317+
return false;
318+
319+
// Check if we've passed the end time
320+
if( m_now > m_endTime )
321+
return false;
322+
323+
bool hasWork = false;
324+
325+
if( m_inRealtime )
326+
{
327+
// Realtime mode: check for push events and timers
328+
TimeDelta waitTime = maxWait;
329+
if( waitTime == TimeDelta::NONE() )
330+
waitTime = TimeDelta::ZERO();
331+
332+
if( !m_pendingPushEvents.hasEvents() )
333+
{
334+
DateTime now = DateTime::now();
335+
if( m_scheduler.hasEvents() )
336+
waitTime = std::min( m_scheduler.nextTime() - now, waitTime );
337+
}
338+
339+
// Don't block in step mode - just check for events
340+
bool haveEvents = m_pushEventQueue.wait( waitTime );
341+
342+
m_now = DateTime::now();
343+
if( m_now > m_endTime )
344+
{
345+
m_now = m_endTime;
346+
return false;
347+
}
348+
349+
++m_cycleCount;
350+
351+
// Execute timers that are ready
352+
if( m_scheduler.hasEvents() && m_scheduler.nextTime() <= m_now )
353+
{
354+
DateTime timerTime = m_scheduler.nextTime();
355+
m_now = timerTime;
356+
m_scheduler.executeNextEvents( m_now );
357+
hasWork = true;
358+
}
359+
else if( haveEvents )
360+
{
361+
// Process push events
362+
PushEvent * events = m_pushEventQueue.popAll();
363+
processPendingPushEvents( m_stepDirtyGroups );
364+
processPushEventQueue( events, m_stepDirtyGroups );
365+
366+
for( auto * group : m_stepDirtyGroups )
367+
group -> state = PushGroup::NONE;
368+
m_stepDirtyGroups.clear();
369+
hasWork = true;
370+
}
371+
}
372+
else
373+
{
374+
// Sim mode: process next scheduled event
375+
if( m_scheduler.hasEvents() )
376+
{
377+
m_now = m_scheduler.nextTime();
378+
if( m_now <= m_endTime )
379+
{
380+
++m_cycleCount;
381+
m_scheduler.executeNextEvents( m_now );
382+
hasWork = true;
383+
}
384+
else
385+
{
386+
m_now = m_endTime;
387+
}
388+
}
389+
}
390+
391+
if( hasWork )
392+
{
393+
m_cycleStepTable.executeCycle( m_profiler.get() );
394+
processEndCycle();
395+
}
396+
397+
// Return true if there's more work to do
398+
return m_state == State::RUNNING && !interrupted() &&
399+
( m_scheduler.hasEvents() || m_pendingPushEvents.hasEvents() );
400+
}
401+
402+
void RootEngine::stopStepping()
403+
{
404+
if( !m_stepping )
405+
return;
406+
407+
try
408+
{
409+
postRun();
410+
}
411+
catch( ... )
412+
{
413+
if( !m_exception_ptr )
414+
m_exception_ptr = std::current_exception();
415+
}
416+
417+
m_state = State::DONE;
418+
m_stepping = false;
419+
m_stepDirtyGroups.clear();
420+
421+
if( m_exception_ptr )
422+
std::rethrow_exception( m_exception_ptr );
423+
}
424+
425+
DateTime RootEngine::nextScheduledTime()
426+
{
427+
if( m_scheduler.hasEvents() )
428+
return m_scheduler.nextTime();
429+
return DateTime::NONE();
430+
}
431+
293432
DictionaryPtr RootEngine::engine_stats() const
294433
{
295434
if( !m_profiler )

cpp/csp/engine/RootEngine.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class EndCycleListener
2525
public:
2626
virtual ~EndCycleListener() {};
2727
virtual void onEndCycle() = 0;
28-
28+
2929
bool isDirty() const { return m_dirty; }
3030
void setDirtyFlag() { m_dirty = true; }
3131
void clearDirtyFlag() { m_dirty = false; }
@@ -57,6 +57,14 @@ class RootEngine : public Engine
5757
void shutdown();
5858
void shutdown( std::exception_ptr except );
5959

60+
// Step-based execution API for asyncio integration
61+
// Allows external event loops to drive CSP execution one step at a time
62+
void startStepping( DateTime start, DateTime end );
63+
bool step( TimeDelta maxWait = TimeDelta::NONE() ); // Returns true if more work pending
64+
void stopStepping();
65+
bool isStepping() const { return m_stepping; }
66+
DateTime nextScheduledTime(); // Returns next scheduled event time, or NONE if none
67+
6068
Scheduler::Handle reserveSchedulerHandle();
6169
Scheduler::Handle scheduleCallback( TimeDelta delta, Scheduler::Callback cb );
6270
Scheduler::Handle scheduleCallback( DateTime time, Scheduler::Callback cb );
@@ -90,7 +98,7 @@ class RootEngine : public Engine
9098
bool interrupted() const;
9199

92100
PushPullEventQueue & pushPullEventQueue() { return m_pushPullEventQueue; }
93-
101+
94102
protected:
95103
enum State { NONE, STARTING, RUNNING, SHUTDOWN, DONE };
96104
using EndCycleListeners = std::vector<EndCycleListener*>;
@@ -131,8 +139,12 @@ class RootEngine : public Engine
131139
PendingPushEvents m_pendingPushEvents;
132140
Settings m_settings;
133141
bool m_inRealtime;
142+
bool m_stepping; // True when in step-based execution mode
134143
int m_initSignalCount;
135144

145+
// For step-based execution
146+
std::vector<PushGroup *> m_stepDirtyGroups;
147+
136148
PushEventQueue m_pushEventQueue;
137149
//This queue is managed entirely from the PushPullInputAdapter
138150
PushPullEventQueue m_pushPullEventQueue;
@@ -168,7 +180,7 @@ inline Scheduler::Handle RootEngine::scheduleCallback( Scheduler::Handle reserve
168180
if( time < m_now ) [[unlikely]]
169181
CSP_THROW( ValueError, "Cannot schedule event in the past. new time: " << time << " now: " << m_now );
170182

171-
return m_scheduler.scheduleCallback( reservedHandle, time, std::move( cb ) );
183+
return m_scheduler.scheduleCallback( reservedHandle, time, std::move( cb ) );
172184
}
173185

174186
inline Scheduler::Handle RootEngine::rescheduleCallback( Scheduler::Handle id, csp::DateTime time )

cpp/csp/python/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set(CSPIMPL_PUBLIC_HEADERS
3232
NumpyConversions.h
3333
NumpyInputAdapter.h
3434
PyAdapterManagerWrapper.h
35+
PyEventLoop.h
3536
PyBasketInputProxy.h
3637
PyBasketOutputProxy.h
3738
PyCppNode.h
@@ -56,6 +57,7 @@ add_library(cspimpl SHARED
5657
NumpyConversions.cpp
5758
PyAdapterManager.cpp
5859
PyAdapterManagerWrapper.cpp
60+
PyEventLoop.cpp
5961
PyConstAdapter.cpp
6062
PyCppNode.cpp
6163
PyEngine.cpp

cpp/csp/python/PyEngine.cpp

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ static void PyEngine_dealloc( PyEngine * self )
9595
{
9696
CSP_BEGIN_METHOD;
9797
self -> ~PyEngine();
98-
Py_TYPE( self ) -> tp_free( self );
98+
Py_TYPE( self ) -> tp_free( self );
9999
CSP_RETURN;
100100
}
101101

@@ -105,7 +105,7 @@ static PyObject * PyEngine_run( PyEngine * self, PyObject * args )
105105

106106
PyObject * pyStart;
107107
PyObject * pyEnd;
108-
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
108+
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
109109
return nullptr;
110110

111111
auto start = fromPython<DateTime>( pyStart );
@@ -118,8 +118,94 @@ static PyObject * PyEngine_run( PyEngine * self, PyObject * args )
118118
CSP_RETURN_NONE;
119119
}
120120

121+
static PyObject * PyEngine_startStepping( PyEngine * self, PyObject * args )
122+
{
123+
CSP_BEGIN_METHOD;
124+
125+
PyObject * pyStart;
126+
PyObject * pyEnd;
127+
if( !PyArg_ParseTuple( args, "OO", &pyStart, &pyEnd ) )
128+
return nullptr;
129+
130+
auto start = fromPython<DateTime>( pyStart );
131+
auto end = fromPython<DateTime>( pyEnd );
132+
133+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
134+
self -> rootEngine() -> startStepping( start, end );
135+
136+
Py_RETURN_NONE;
137+
CSP_RETURN_NONE;
138+
}
139+
140+
static PyObject * PyEngine_step( PyEngine * self, PyObject * args )
141+
{
142+
CSP_BEGIN_METHOD;
143+
144+
double maxWaitSeconds = 0.0;
145+
if( !PyArg_ParseTuple( args, "|d", &maxWaitSeconds ) )
146+
return nullptr;
147+
148+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
149+
150+
TimeDelta maxWait = maxWaitSeconds > 0 ? TimeDelta::fromSeconds( maxWaitSeconds ) : TimeDelta::ZERO();
151+
bool hasMore = self -> rootEngine() -> step( maxWait );
152+
153+
return PyBool_FromLong( hasMore );
154+
CSP_RETURN_NONE;
155+
}
156+
157+
static PyObject * PyEngine_stopStepping( PyEngine * self, PyObject * args )
158+
{
159+
CSP_BEGIN_METHOD;
160+
161+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
162+
self -> rootEngine() -> stopStepping();
163+
164+
return self -> collectOutputs();
165+
CSP_RETURN_NONE;
166+
}
167+
168+
static PyObject * PyEngine_isStepping( PyEngine * self, PyObject * args )
169+
{
170+
CSP_BEGIN_METHOD;
171+
172+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
173+
bool stepping = self -> rootEngine() -> isStepping();
174+
175+
return PyBool_FromLong( stepping );
176+
CSP_RETURN_NONE;
177+
}
178+
179+
static PyObject * PyEngine_now( PyEngine * self, PyObject * args )
180+
{
181+
CSP_BEGIN_METHOD;
182+
183+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
184+
DateTime now = self -> rootEngine() -> now();
185+
186+
return toPython( now );
187+
CSP_RETURN_NONE;
188+
}
189+
190+
static PyObject * PyEngine_nextScheduledTime( PyEngine * self, PyObject * args )
191+
{
192+
CSP_BEGIN_METHOD;
193+
194+
CSP_TRUE_OR_THROW_RUNTIME( self -> engine() -> isRootEngine(), "engine is not root engine" );
195+
DateTime nextTime = self -> rootEngine() -> nextScheduledTime();
196+
197+
return toPython( nextTime );
198+
CSP_RETURN_NONE;
199+
}
200+
121201
static PyMethodDef PyEngine_methods[] = {
122-
{ "run", (PyCFunction) PyEngine_run, METH_VARARGS, "start and run engine" },
202+
{ "run", ( PyCFunction ) PyEngine_run, METH_VARARGS, "start and run engine" },
203+
{ "start_stepping", ( PyCFunction ) PyEngine_startStepping, METH_VARARGS, "start engine in step mode" },
204+
{ "step", ( PyCFunction ) PyEngine_step, METH_VARARGS, "execute one step, returns True if more work pending" },
205+
{ "stop_stepping", ( PyCFunction ) PyEngine_stopStepping, METH_NOARGS, "stop stepping and cleanup" },
206+
{ "is_stepping", ( PyCFunction ) PyEngine_isStepping, METH_NOARGS, "check if in stepping mode" },
207+
{ "now", ( PyCFunction ) PyEngine_now, METH_NOARGS, "get current engine time" },
208+
{ "next_scheduled_time", ( PyCFunction ) PyEngine_nextScheduledTime, METH_NOARGS, "get next scheduled event time" },
123209
{ NULL }
124210
};
125211

0 commit comments

Comments
 (0)