Skip to content

Commit d955aae

Browse files
committed
csp event loop, async bridges
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent ef9ff93 commit d955aae

38 files changed

Lines changed: 11064 additions & 121 deletions

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ if(WIN32)
169169
foreach(warning 4244 4251 4267 4275 4290 4786 4305 4996)
170170
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd${warning}")
171171
endforeach(warning)
172-
add_compile_definitions(WIN32 _WIN32)
172+
add_compile_definitions(WIN32 _WIN32 WIN32_LEAN_AND_MEAN NOMINMAX)
173173

174174
else()
175175
if(CSP_BUILD_NO_CXX_ABI)

conda/dev-environment-unix.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies:
1414
- deprecated
1515
- docutils<0.22.1
1616
- exprtk
17+
- fastapi
1718
- flex
1819
- graphviz
1920
- gtest
@@ -57,6 +58,7 @@ dependencies:
5758
- twine
5859
- typing-extensions
5960
- unzip
61+
- uvicorn
6062
- wheel
6163
- zip
6264
- zlib

conda/dev-environment-win.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies:
1414
- deprecated
1515
- docutils<0.22.1
1616
- exprtk
17+
- fastapi
1718
# - flex # not available on windows
1819
- graphviz
1920
- gtest
@@ -57,6 +58,7 @@ dependencies:
5758
- twine
5859
- typing-extensions
5960
# - unzip # not available on windows
61+
- uvicorn
6062
- wheel
6163
# - zip # not available on windows
6264
- zlib

cpp/csp/core/QueueWaiter.h

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,26 @@
11
#ifndef _IN_CSP_CORE_QUEUEBLOCKINGWAIT_H
22
#define _IN_CSP_CORE_QUEUEBLOCKINGWAIT_H
33

4+
// Windows: Include winsock2.h for FdWaiter socket pair implementation
5+
// WIN32_LEAN_AND_MEAN is defined project-wide to prevent winsock.h/winsock2.h conflicts
6+
#ifdef _WIN32
7+
#include <winsock2.h>
8+
#pragma comment(lib, "ws2_32.lib")
9+
#endif
10+
411
#include <mutex>
512
#include <condition_variable>
613
#include <csp/core/Time.h>
714
#include <csp/core/System.h>
815

16+
#ifdef __linux__
17+
#include <sys/eventfd.h>
18+
#include <unistd.h>
19+
#elif defined(__APPLE__)
20+
#include <unistd.h>
21+
#include <fcntl.h>
22+
#endif
23+
924
namespace csp
1025
{
1126

@@ -43,6 +58,197 @@ class QueueWaiter
4358
bool m_eventsPending;
4459
};
4560

61+
// FdWaiter provides file descriptor based signaling for integration with
62+
// external event loops like asyncio. The read fd can be registered with
63+
// select/poll/epoll and will become readable when notify() is called.
64+
class FdWaiter
65+
{
66+
public:
67+
FdWaiter()
68+
{
69+
#ifdef __linux__
70+
// Linux: use eventfd (single fd, most efficient)
71+
m_eventfd = eventfd( 0, EFD_NONBLOCK | EFD_CLOEXEC );
72+
m_readFd = m_eventfd;
73+
m_writeFd = m_eventfd;
74+
#elif defined(__APPLE__)
75+
// macOS: use pipe
76+
int fds[2];
77+
if( pipe( fds ) == 0 )
78+
{
79+
m_readFd = fds[0];
80+
m_writeFd = fds[1];
81+
// Set non-blocking
82+
fcntl( m_readFd, F_SETFL, O_NONBLOCK );
83+
fcntl( m_writeFd, F_SETFL, O_NONBLOCK );
84+
}
85+
else
86+
{
87+
m_readFd = -1;
88+
m_writeFd = -1;
89+
}
90+
#elif defined(_WIN32)
91+
// Windows: use socket pair (localhost loopback)
92+
m_readFd = INVALID_SOCKET;
93+
m_writeFd = INVALID_SOCKET;
94+
createSocketPair();
95+
#endif
96+
}
97+
98+
~FdWaiter()
99+
{
100+
#ifdef __linux__
101+
if( m_eventfd >= 0 )
102+
close( m_eventfd );
103+
#elif defined(__APPLE__)
104+
if( m_readFd >= 0 )
105+
close( m_readFd );
106+
if( m_writeFd >= 0 )
107+
close( m_writeFd );
108+
#elif defined(_WIN32)
109+
if( m_readFd != INVALID_SOCKET )
110+
closesocket( m_readFd );
111+
if( m_writeFd != INVALID_SOCKET )
112+
closesocket( m_writeFd );
113+
#endif
114+
}
115+
116+
// Get the file descriptor for select/poll registration
117+
// Returns -1 (or INVALID_SOCKET on Windows) if not available
118+
#ifdef _WIN32
119+
SOCKET readFd() const { return m_readFd; }
120+
#else
121+
int readFd() const { return m_readFd; }
122+
#endif
123+
124+
// Signal the fd (makes it readable)
125+
void notify()
126+
{
127+
std::lock_guard<std::mutex> guard( m_lock );
128+
if( m_notified )
129+
return; // Already notified, avoid filling buffer
130+
131+
m_notified = true;
132+
133+
#ifdef __linux__
134+
uint64_t val = 1;
135+
[[maybe_unused]] auto rv = write( m_eventfd, &val, sizeof( val ) );
136+
#elif defined(__APPLE__)
137+
char c = 1;
138+
[[maybe_unused]] auto rv = write( m_writeFd, &c, 1 );
139+
#elif defined(_WIN32)
140+
char c = 1;
141+
send( m_writeFd, &c, 1, 0 );
142+
#endif
143+
}
144+
145+
// Clear the notification (call after processing)
146+
void clear()
147+
{
148+
std::lock_guard<std::mutex> guard( m_lock );
149+
m_notified = false;
150+
151+
#ifdef __linux__
152+
uint64_t val;
153+
[[maybe_unused]] auto rv = read( m_eventfd, &val, sizeof( val ) );
154+
#elif defined(__APPLE__)
155+
char buf[64];
156+
while( read( m_readFd, buf, sizeof( buf ) ) > 0 ) {}
157+
#elif defined(_WIN32)
158+
char buf[64];
159+
while( recv( m_readFd, buf, sizeof( buf ), 0 ) > 0 ) {}
160+
#endif
161+
}
162+
163+
bool isValid() const
164+
{
165+
#ifdef _WIN32
166+
return m_readFd != INVALID_SOCKET;
167+
#else
168+
return m_readFd >= 0;
169+
#endif
170+
}
171+
172+
private:
173+
#ifdef _WIN32
174+
void createSocketPair()
175+
{
176+
// Create a listening socket on localhost
177+
SOCKET listener = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
178+
if( listener == INVALID_SOCKET )
179+
return;
180+
181+
struct sockaddr_in addr;
182+
memset( &addr, 0, sizeof( addr ) );
183+
addr.sin_family = AF_INET;
184+
addr.sin_addr.s_addr = htonl( INADDR_LOOPBACK );
185+
addr.sin_port = 0; // Let OS pick a port
186+
187+
if( bind( listener, (struct sockaddr*)&addr, sizeof( addr ) ) == SOCKET_ERROR )
188+
{
189+
closesocket( listener );
190+
return;
191+
}
192+
193+
int addrlen = sizeof( addr );
194+
if( getsockname( listener, (struct sockaddr*)&addr, &addrlen ) == SOCKET_ERROR )
195+
{
196+
closesocket( listener );
197+
return;
198+
}
199+
200+
if( listen( listener, 1 ) == SOCKET_ERROR )
201+
{
202+
closesocket( listener );
203+
return;
204+
}
205+
206+
// Create client socket and connect
207+
m_writeFd = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
208+
if( m_writeFd == INVALID_SOCKET )
209+
{
210+
closesocket( listener );
211+
return;
212+
}
213+
214+
if( connect( m_writeFd, (struct sockaddr*)&addr, sizeof( addr ) ) == SOCKET_ERROR )
215+
{
216+
closesocket( m_writeFd );
217+
closesocket( listener );
218+
m_writeFd = INVALID_SOCKET;
219+
return;
220+
}
221+
222+
// Accept the connection
223+
m_readFd = accept( listener, NULL, NULL );
224+
closesocket( listener ); // Done with listener
225+
226+
if( m_readFd == INVALID_SOCKET )
227+
{
228+
closesocket( m_writeFd );
229+
m_writeFd = INVALID_SOCKET;
230+
return;
231+
}
232+
233+
// Set non-blocking
234+
u_long mode = 1;
235+
ioctlsocket( m_readFd, FIONBIO, &mode );
236+
ioctlsocket( m_writeFd, FIONBIO, &mode );
237+
}
238+
239+
SOCKET m_readFd;
240+
SOCKET m_writeFd;
241+
#else
242+
int m_readFd;
243+
int m_writeFd;
244+
#ifdef __linux__
245+
int m_eventfd;
246+
#endif
247+
#endif
248+
std::mutex m_lock;
249+
bool m_notified = false;
250+
};
251+
46252
}
47253

48254
#endif

0 commit comments

Comments
 (0)