Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.117
github.com/openimsdk/tools v0.0.50-alpha.119
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.11.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.117 h1:ACfijEVCeBcttT7OOkNGOOOvq14pJtb9szNIMHLm6Vc=
github.com/openimsdk/tools v0.0.50-alpha.117/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0=
github.com/openimsdk/tools v0.0.50-alpha.119 h1:S/RjRtL0ciwiG6pZKssbU//qVSWi7AuKHOPCHsQgz68=
github.com/openimsdk/tools v0.0.50-alpha.119/go.mod h1:I0WESSa7ghPIo9BL+ETlH/qEIbO6+KZioM1jwNuDwz0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
Expand Down
18 changes: 3 additions & 15 deletions internal/msggateway/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/openimsdk/tools/apiresp"

"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
Expand Down Expand Up @@ -288,25 +289,12 @@ func (ws *WsServer) registerClient(client *Client) {
}
}

wg := sync.WaitGroup{}
log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", "discoveryEnable", ws.msgGatewayConfig.Discovery.Enable)

if ws.msgGatewayConfig.Discovery.Enable != "k8s" {
wg.Add(1)
go func() {
defer wg.Done()
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
if ws.msgGatewayConfig.Discovery.Enable != config.KUBERNETES {
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}

//wg.Add(1)
//go func() {
// defer wg.Done()
// ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
//}()

wg.Wait()

log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
}

Expand Down
7 changes: 4 additions & 3 deletions internal/push/onlinepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
Expand Down Expand Up @@ -39,11 +40,11 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *

func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
switch config.Discovery.Enable {
case "k8s":
return NewK8sStaticConsistentHash(disCov, config)
case conf.KUBERNETES:
return NewDefaultAllNode(disCov, config)
case "zookeeper":
return NewDefaultAllNode(disCov, config)
case "etcd":
case conf.ETCD:
return NewDefaultAllNode(disCov, config)
default:
return newEmptyOnlinePusher()
Expand Down
2 changes: 1 addition & 1 deletion internal/tools/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestName(t *testing.T) {
Address: []string{"localhost:12379"},
},
}
client, err := kdisc.NewDiscoveryRegister(conf, "source")
client, err := kdisc.NewDiscoveryRegister(conf, &config.Share{}, nil)
if err != nil {
panic(err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,14 @@ type ZooKeeper struct {
}

type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
Kubernetes Kubernetes `mapstructure:"kubernetes"`
ZooKeeper ZooKeeper `mapstructure:"zooKeeper"`
}

type Kubernetes struct {
Namespace string `mapstructure:"namespace"`
}

type Etcd struct {
Expand Down
24 changes: 23 additions & 1 deletion pkg/common/config/load_config_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package config

import (
"github.com/stretchr/testify/assert"
"os"
"path/filepath"

"testing"

"github.com/stretchr/testify/assert"
)

func TestLoadLogConfig(t *testing.T) {
Expand All @@ -27,6 +31,24 @@ func TestLoadWebhooksConfig(t *testing.T) {

}

func TestLoadDiscoveryKubernetesConfig(t *testing.T) {
path := filepath.Join(t.TempDir(), "discovery.yml")
err := os.WriteFile(path, []byte(`enable: kubernetes
kubernetes:
namespace: openim
etcd:
rootDirectory: openim
address: [localhost:12379]
`), 0600)
assert.Nil(t, err)

var discovery Discovery
err = LoadConfig(path, "IMENV_DISCOVERY", &discovery)
assert.Nil(t, err)
assert.Equal(t, KUBERNETES, discovery.Enable)
assert.Equal(t, "openim", discovery.Kubernetes.Namespace)
}

func TestLoadOpenIMRpcUserConfig(t *testing.T) {
var user User
err := LoadConfig("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", &user)
Expand Down
15 changes: 11 additions & 4 deletions pkg/common/discoveryregister/discoveryregister.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@ import (
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/kubernetes"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
)

// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable {
case "k8s":
return kubernetes.NewConnManager("default", watchNames,
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
namespace := discovery.Kubernetes.Namespace
if namespace == "" {
namespace = "default"
}
return kubernetes.NewConnManager(namespace, watchNames,
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1024*1024*20),
),
)
case "etcd":
}

switch discovery.Enable {
case config.ETCD:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
Expand Down
34 changes: 25 additions & 9 deletions pkg/common/discoveryregister/discoveryregister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
package discoveryregister

import (
"os"
)
"strings"
"testing"

func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/runtimeenv"
)

//func TestNewDiscoveryRegister(t *testing.T) {
// setupTestEnvironment()
Expand Down Expand Up @@ -58,3 +54,23 @@ func setupTestEnvironment() {
// }
// }
//}

func TestNewDiscoveryRegisterRejectsKubernetesOutsideCluster(t *testing.T) {
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
t.Skip("outside-cluster fallback is only relevant outside Kubernetes")
}
discovery := &config.Discovery{
Enable: config.KUBERNETES,
Kubernetes: config.Kubernetes{
Namespace: "default",
},
}

client, err := NewDiscoveryRegister(discovery, &config.Share{}, nil)
if err == nil && client != nil {
client.Close()
}
if err == nil || !strings.Contains(err.Error(), "unsupported discovery type") {
t.Fatalf("%q outside Kubernetes should not select Kubernetes discovery: %v", config.KUBERNETES, err)
}
}
Loading