Skip to content

Commit 0d12d3b

Browse files
committed
fix(cli): harden search fallback and lock cleanup
1 parent 4e3b19d commit 0d12d3b

3 files changed

Lines changed: 103 additions & 52 deletions

File tree

internal/cli/cli.go

Lines changed: 61 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ type commandContext struct {
5050

5151
const liveHTTPTimeout = 90 * time.Second
5252

53+
func withLock(dbPath string, fn func() error) error {
54+
lck, err := lock.Acquire(dbPath)
55+
if err != nil {
56+
return err
57+
}
58+
runErr := fn()
59+
releaseErr := lck.Release()
60+
if runErr != nil {
61+
return runErr
62+
}
63+
if releaseErr != nil {
64+
return fmt.Errorf("release lock: %w", releaseErr)
65+
}
66+
return nil
67+
}
68+
5369
type kongExit int
5470

5571
func Run(ctx context.Context, args []string, stdout, stderr io.Writer) (err error) {
@@ -251,13 +267,12 @@ func (cmd syncCmd) Run(ctx commandContext) error {
251267
if err := config.EnsureDirs(rt); err != nil {
252268
return err
253269
}
254-
lck, err := lock.Acquire(rt.Config.DBPath)
255-
if err != nil {
256-
return err
257-
}
258-
defer lck.Release()
259-
result, err := store.SyncFixture(ctx, rt.Config.DBPath, fixture)
260-
if err != nil {
270+
var result store.SyncResult
271+
if err := withLock(rt.Config.DBPath, func() error {
272+
var syncErr error
273+
result, syncErr = store.SyncFixture(ctx, rt.Config.DBPath, fixture)
274+
return syncErr
275+
}); err != nil {
261276
return err
262277
}
263278
return writeMaybeJSON(ctx.stdout, cmd.JSON, result)
@@ -307,11 +322,6 @@ func (cmd syncCmd) Run(ctx commandContext) error {
307322
if config.IntercomToken() == "" {
308323
return fmt.Errorf("missing %s for live Intercom sync", config.EnvIntercomCred)
309324
}
310-
lck, err := lock.Acquire(rt.Config.DBPath)
311-
if err != nil {
312-
return err
313-
}
314-
defer lck.Release()
315325
client := intercom.Client{
316326
BaseURL: config.IntercomBaseURL(),
317327
Token: config.IntercomToken(),
@@ -332,16 +342,19 @@ func (cmd syncCmd) Run(ctx commandContext) error {
332342
}
333343
s := syncer.IntercomSyncer{Client: client}
334344
var result store.SyncResult
335-
if cmd.Conversation != "" {
336-
result, err = s.SyncConversation(ctx, rt.Config.DBPath, cmd.Conversation)
337-
} else if cmd.Entities {
338-
result, err = s.SyncEntities(ctx, rt.Config.DBPath, syncer.EntitySyncOptions{IncludeContacts: cmd.Contacts, ContactLimit: cmd.Limit})
339-
} else if cmd.Resume {
340-
result, err = s.ResumeTail(ctx, rt.Config.DBPath, cmd.Limit)
341-
} else {
342-
result, err = s.SyncUpdatedSince(ctx, rt.Config.DBPath, updatedAfter, updatedBefore, cmd.Limit)
343-
}
344-
if err != nil {
345+
if err := withLock(rt.Config.DBPath, func() error {
346+
var syncErr error
347+
if cmd.Conversation != "" {
348+
result, syncErr = s.SyncConversation(ctx, rt.Config.DBPath, cmd.Conversation)
349+
} else if cmd.Entities {
350+
result, syncErr = s.SyncEntities(ctx, rt.Config.DBPath, syncer.EntitySyncOptions{IncludeContacts: cmd.Contacts, ContactLimit: cmd.Limit})
351+
} else if cmd.Resume {
352+
result, syncErr = s.ResumeTail(ctx, rt.Config.DBPath, cmd.Limit)
353+
} else {
354+
result, syncErr = s.SyncUpdatedSince(ctx, rt.Config.DBPath, updatedAfter, updatedBefore, cmd.Limit)
355+
}
356+
return syncErr
357+
}); err != nil {
345358
return err
346359
}
347360
return writeMaybeJSON(ctx.stdout, cmd.JSON, result)
@@ -564,13 +577,12 @@ func (cmd publishCmd) Run(ctx commandContext) error {
564577
records := archive.FixtureRecords(fixture)
565578
return writeMaybeJSON(ctx.stdout, cmd.JSON, archiveDryRun("publish", cmd.Out, len(records)))
566579
}
567-
lck, err := lock.Acquire(rt.Config.DBPath)
568-
if err != nil {
569-
return err
570-
}
571-
defer lck.Release()
572-
fixture, err := store.ExportFixture(ctx, rt.Config.DBPath)
573-
if err != nil {
580+
var fixture store.Fixture
581+
if err := withLock(rt.Config.DBPath, func() error {
582+
var exportErr error
583+
fixture, exportErr = store.ExportFixture(ctx, rt.Config.DBPath)
584+
return exportErr
585+
}); err != nil {
574586
return err
575587
}
576588
records := archive.FixtureRecords(fixture)
@@ -622,13 +634,12 @@ func (cmd importCmd) Run(ctx commandContext) error {
622634
if err := config.EnsureDirs(rt); err != nil {
623635
return err
624636
}
625-
lck, err := lock.Acquire(rt.Config.DBPath)
626-
if err != nil {
627-
return err
628-
}
629-
defer lck.Release()
630-
result, err := store.SyncFixture(ctx, rt.Config.DBPath, fixture)
631-
if err != nil {
637+
var result store.SyncResult
638+
if err := withLock(rt.Config.DBPath, func() error {
639+
var syncErr error
640+
result, syncErr = store.SyncFixture(ctx, rt.Config.DBPath, fixture)
641+
return syncErr
642+
}); err != nil {
632643
return err
633644
}
634645
return writeMaybeJSON(ctx.stdout, cmd.JSON, importResult{Input: cmd.In, Records: len(records), Sync: result})
@@ -709,22 +720,22 @@ func (cmd subscribeCmd) Run(ctx commandContext) error {
709720
if err := config.EnsureDirs(rt); err != nil {
710721
return err
711722
}
712-
lck, err := lock.Acquire(rt.Config.DBPath)
713-
if err != nil {
714-
return err
715-
}
716-
defer lck.Release()
717723
aggregate := store.SyncResult{}
718-
for index, item := range pending {
719-
syncResult, err := store.SyncFixture(ctx, rt.Config.DBPath, item.fixture)
720-
if err != nil {
721-
return fmt.Errorf("import tenant store snapshot %s: %w", item.snapshot.Path, err)
724+
if err := withLock(rt.Config.DBPath, func() error {
725+
for index, item := range pending {
726+
syncResult, err := store.SyncFixture(ctx, rt.Config.DBPath, item.fixture)
727+
if err != nil {
728+
return fmt.Errorf("import tenant store snapshot %s: %w", item.snapshot.Path, err)
729+
}
730+
mergeSyncResult(&aggregate, syncResult)
731+
result.Snapshots[index].Records = len(item.records)
732+
result.Snapshots[index].Imported = true
733+
result.Snapshots[index].Path = item.snapshot.Path
734+
result.ImportedSnapshots++
722735
}
723-
mergeSyncResult(&aggregate, syncResult)
724-
result.Snapshots[index].Records = len(item.records)
725-
result.Snapshots[index].Imported = true
726-
result.Snapshots[index].Path = item.snapshot.Path
727-
result.ImportedSnapshots++
736+
return nil
737+
}); err != nil {
738+
return err
728739
}
729740
result.Records = totalRecords
730741
result.Sync = &aggregate

internal/store/helpers_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,31 @@ func TestSearchOnEmptyDBReturnsNoResults(t *testing.T) {
347347
}
348348
}
349349

350+
func TestSearchFTSErrorDoesNotFallbackToLike(t *testing.T) {
351+
ctx := context.Background()
352+
dbPath := filepath.Join(t.TempDir(), "archive.db")
353+
fixture, err := LoadFixture(filepath.Join("..", "..", "testdata", "synthetic"))
354+
if err != nil {
355+
t.Fatal(err)
356+
}
357+
if _, err := SyncFixture(ctx, dbPath, fixture); err != nil {
358+
t.Fatal(err)
359+
}
360+
st, err := ckstore.Open(ctx, ckstore.Options{Path: dbPath, Schema: Schema, SchemaVersion: SchemaVersion})
361+
if err != nil {
362+
t.Fatal(err)
363+
}
364+
if _, err := st.DB().ExecContext(ctx, `drop table conversations`); err != nil {
365+
t.Fatal(err)
366+
}
367+
if err := st.Close(); err != nil {
368+
t.Fatal(err)
369+
}
370+
if _, err := SearchWithOptions(ctx, dbPath, "invoice", SearchOptions{Limit: 10}); err == nil || !strings.Contains(err.Error(), "search fts") {
371+
t.Fatalf("SearchWithOptions error = %v, want FTS error", err)
372+
}
373+
}
374+
350375
func TestSearchWithStateFilterUsesLikePath(t *testing.T) {
351376
ctx := context.Background()
352377
dbPath := filepath.Join(t.TempDir(), "archive.db")

internal/store/search.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,27 @@ func SearchWithOptions(ctx context.Context, dbPath, query string, opts SearchOpt
5757
}
5858
defer st.Close()
5959
results, err := searchFTS(ctx, st.DB(), query, opts)
60-
if err == nil && len(results) > 0 {
61-
return results, nil
60+
if err == nil {
61+
if len(results) > 0 {
62+
return results, nil
63+
}
64+
return searchLike(ctx, st.DB(), query, opts)
65+
}
66+
if !isLegacyFTSError(err) {
67+
return nil, fmt.Errorf("search fts: %w", err)
6268
}
6369
return searchLike(ctx, st.DB(), query, opts)
6470
}
6571

72+
func isLegacyFTSError(err error) bool {
73+
if err == nil {
74+
return false
75+
}
76+
msg := err.Error()
77+
return strings.Contains(msg, "no such table: conversation_fts") ||
78+
strings.Contains(msg, "no such module: fts5")
79+
}
80+
6681
func normalizeSearchOptions(opts SearchOptions) SearchOptions {
6782
if opts.Limit <= 0 {
6883
opts.Limit = 20

0 commit comments

Comments
 (0)