@@ -18,8 +18,8 @@ import (
1818 "time"
1919 "unsafe"
2020
21- "github.com/restream/reindexer/v4 /bindings"
22- "github.com/restream/reindexer/v4 /cjson"
21+ "github.com/restream/reindexer/v5 /bindings"
22+ "github.com/restream/reindexer/v5 /cjson"
2323)
2424
2525const defCgoLimit = 2000
@@ -250,12 +250,13 @@ func (binding *Builtin) Init(u []url.URL, eh bindings.EventsHandler, options ...
250250 caps := * bindings .DefaultBindingCapabilities ().
251251 WithResultsWithShardIDs (true ).
252252 WithQrIdleTimeouts (true ).
253- WithIncarnationTags (true )
253+ WithIncarnationTags (true ).
254+ WithFloatRank (true )
254255 ccaps := C.BindingCapabilities {
255256 caps : C .int64_t (caps .Value ),
256257 }
257258
258- return err2go (C .reindexer_connect_v4 (binding .rx , str2c (u [0 ].Host + u [0 ].Path ), opts , str2c (bindings .ReindexerVersion ), ccaps ))
259+ return err2go (C .reindexer_connect (binding .rx , str2c (u [0 ].Host + u [0 ].Path ), opts , str2c (bindings .ReindexerVersion ), ccaps ))
259260}
260261
261262func (binding * Builtin ) StartWatchOnCtx (ctx context.Context ) (CCtxWrapper , error ) {
@@ -677,6 +678,7 @@ func (binding *Builtin) Finalize() error {
677678 if binding .eventsHandler != nil && binding .rx != 0 {
678679 binding .eventsHandler .Unsubscribe (binding .rx )
679680 }
681+ bufFree .free_buffers_sync ()
680682 C .destroy_reindexer (binding .rx )
681683 binding .rx = 0
682684 if binding .cgoLimiterStat != nil {
@@ -716,6 +718,10 @@ func (binding *Builtin) Unsubscribe(ctx context.Context) error {
716718 return binding .eventsHandler .Unsubscribe (binding .rx )
717719}
718720
721+ func (binding * Builtin ) DBMSVersion () (string , error ) {
722+ return C .GoString (C .reindexer_version ()), nil
723+ }
724+
719725func newBufFreeBatcher () (bf * bufFreeBatcher ) {
720726 bf = & bufFreeBatcher {
721727 bufs : make ([]* RawCBuffer , 0 , 100 ),
@@ -732,35 +738,46 @@ type bufFreeBatcher struct {
732738 cbufs []C.reindexer_resbuffer
733739 lock sync.Mutex
734740 kickCh chan struct {}
735- }
736741
737- func (bf * bufFreeBatcher ) loop () {
738- for {
739- <- bf .kickCh
742+ rxTerminationLock sync.Mutex
743+ }
740744
741- bf .lock .Lock ()
742- if len (bf .bufs ) == 0 {
743- bf .lock .Unlock ()
744- continue
745- }
746- bf .bufs , bf .bufs2 = bf .bufs2 , bf .bufs
745+ func (bf * bufFreeBatcher ) free_buffers_impl () {
746+ bf .rxTerminationLock .Lock ()
747+ defer bf .rxTerminationLock .Unlock ()
748+ bf .lock .Lock ()
749+ if len (bf .bufs ) == 0 {
747750 bf .lock .Unlock ()
751+ return
752+ }
753+ bf .bufs , bf .bufs2 = bf .bufs2 , bf .bufs
754+ bf .lock .Unlock ()
748755
749- for _ , buf := range bf .bufs2 {
750- bf .cbufs = append (bf .cbufs , buf .cbuf )
751- }
756+ for _ , buf := range bf .bufs2 {
757+ bf .cbufs = append (bf .cbufs , buf .cbuf )
758+ }
752759
753- C .reindexer_free_buffers (& bf .cbufs [0 ], C .int (len (bf .cbufs )))
760+ C .reindexer_free_buffers (& bf .cbufs [0 ], C .int (len (bf .cbufs )))
754761
755- for _ , buf := range bf .bufs2 {
756- buf .cbuf .results_ptr = 0
757- bf .toPool (buf )
758- }
759- bf .cbufs = bf .cbufs [:0 ]
760- bf .bufs2 = bf .bufs2 [:0 ]
762+ for _ , buf := range bf .bufs2 {
763+ buf .cbuf .results_ptr = 0
764+ bf .toPool (buf )
765+ }
766+ bf .cbufs = bf .cbufs [:0 ]
767+ bf .bufs2 = bf .bufs2 [:0 ]
768+ }
769+
770+ func (bf * bufFreeBatcher ) loop () {
771+ for {
772+ <- bf .kickCh
773+ bf .free_buffers_impl ()
761774 }
762775}
763776
777+ func (bf * bufFreeBatcher ) free_buffers_sync () {
778+ bf .free_buffers_impl ()
779+ }
780+
764781func (bf * bufFreeBatcher ) add (buf * RawCBuffer ) {
765782 if buf .cbuf .results_ptr != 0 {
766783 bf .toFree (buf )
0 commit comments