Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"net"
"net/http"
"net/netip"
"os"
"path/filepath"
"strings"
"time"

"github.com/google/uuid"
)

const (
Expand All @@ -23,6 +27,7 @@ const (
callHomeSleepTime = 30 * time.Minute
backOff = 10 * time.Second
apiKey = "77e04a7c-f207-40dd-8950-c344871fd516"
defDeploymentID = "/var/lib/magistrala/callhome/deployment_id"
)

var ipEndpoints = []string{
Expand All @@ -32,20 +37,22 @@ var ipEndpoints = []string{
}

type homingService struct {
serviceName string
version string
logger *slog.Logger
cancel context.CancelFunc
httpClient http.Client
serviceName string
version string
deploymentID string
logger *slog.Logger
cancel context.CancelFunc
httpClient http.Client
}

func New(svc, version string, homingLogger *slog.Logger, cancel context.CancelFunc) *homingService {
return &homingService{
serviceName: svc,
version: version,
logger: homingLogger,
cancel: cancel,
httpClient: *http.DefaultClient,
serviceName: svc,
version: version,
deploymentID: getDeploymentID(homingLogger),
logger: homingLogger,
cancel: cancel,
httpClient: *http.DefaultClient,
}
}

Expand All @@ -56,9 +63,10 @@ func (hs *homingService) CallHome(ctx context.Context) {
hs.Stop()
default:
data := telemetryData{
Service: hs.serviceName,
Version: hs.version,
LastSeen: time.Now(),
Service: hs.serviceName,
Version: hs.version,
DeploymentID: hs.deploymentID,
LastSeen: time.Now(),
}

var macAddr string
Expand Down Expand Up @@ -118,9 +126,10 @@ type telemetryData struct {
IPAddress string `json:"ip_address"`
// MAC address is used to identify unique machine to avoid duplicates in case
// of multiple services running on the same machine (such as a Docker composition).
MACAddress string `json:"mac_address"`
Version string `json:"magistrala_version"`
LastSeen time.Time `json:"last_seen"`
MACAddress string `json:"mac_address"`
DeploymentID string `json:"deployment_id"`
Version string `json:"magistrala_version"`
LastSeen time.Time `json:"last_seen"`
}

func (hs *homingService) getIP(endpoint string) (string, error) {
Expand Down Expand Up @@ -160,3 +169,31 @@ func (hs *homingService) send(telDat *telemetryData) error {
}
return nil
}

func getDeploymentID(logger *slog.Logger) string {
if id := os.Getenv("MG_DEPLOYMENT_ID"); id != "" {
return id
Comment thread
SammyOina marked this conversation as resolved.
}

path := defDeploymentID
if p := os.Getenv("MG_CALLHOME_DEPLOYMENT_ID_FILE"); p != "" {
path = p
}

if id, err := os.ReadFile(path); err == nil {
return string(id)
Comment thread
SammyOina marked this conversation as resolved.
}

id := uuid.New().String()
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
logger.Warn(fmt.Sprintf("failed to create directory for deployment id with error: %v", err))
return ""
}

if err := os.WriteFile(path, []byte(id), 0o644); err != nil {
logger.Warn(fmt.Sprintf("failed to write deployment id with error: %v", err))
return ""
}
Comment thread
SammyOina marked this conversation as resolved.

return id
}
23 changes: 12 additions & 11 deletions telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
)

type Telemetry struct {
Services pq.StringArray `json:"services,omitempty" db:"services"`
Service string `json:"service,omitempty" db:"service"`
Longitude float64 `json:"longitude,omitempty" db:"longitude"`
Latitude float64 `json:"latitude,omitempty" db:"latitude"`
IpAddress string `json:"-" db:"ip_address"`
MacAddress string `json:"-" db:"mac_address"`
Version string `json:"magistrala_version,omitempty" db:"mg_version"`
LastSeen time.Time `json:"last_seen" db:"service_time"`
Country string `json:"country,omitempty" db:"country"`
City string `json:"city,omitempty" db:"city"`
ServiceTime time.Time `json:"timestamp" db:"time"`
Services pq.StringArray `json:"services,omitempty" db:"services"`
Service string `json:"service,omitempty" db:"service"`
Longitude float64 `json:"longitude,omitempty" db:"longitude"`
Latitude float64 `json:"latitude,omitempty" db:"latitude"`
IpAddress string `json:"-" db:"ip_address"`
MacAddress string `json:"-" db:"mac_address"`
DeploymentID string `json:"-" db:"deployment_id"`
Version string `json:"magistrala_version,omitempty" db:"mg_version"`
LastSeen time.Time `json:"last_seen" db:"service_time"`
Country string `json:"country,omitempty" db:"country"`
City string `json:"city,omitempty" db:"city"`
ServiceTime time.Time `json:"timestamp" db:"time"`
}

type TelemetryFilters struct {
Expand Down
11 changes: 11 additions & 0 deletions timescale/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ func Migration() migrate.MemoryMigrationSource {
`SELECT add_retention_policy('telemetry', INTERVAL '90 days');`,
},
},
{
Id: "telemetry_8",
Up: []string{
`ALTER TABLE telemetry ADD deployment_id TEXT;`,
`CREATE INDEX IF NOT EXISTS idx_telemetry_deployment_or_ip ON telemetry (COALESCE(deployment_id, ip_address));`,
},
Down: []string{
`DROP INDEX IF EXISTS idx_telemetry_deployment_id;`,
`ALTER TABLE telemetry DROP COLUMN deployment_id;`,
},
},
},
}
}
19 changes: 10 additions & 9 deletions timescale/timescale.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func (r repo) RetrieveAll(ctx context.Context, pm callhome.PageMetadata, filters
// DISTINCT ON is much faster than ROW_NUMBER() window function
q := fmt.Sprintf(`
WITH latest_per_ip AS (
SELECT DISTINCT ON (ip_address)
SELECT DISTINCT ON (COALESCE(deployment_id, ip_address))
ip_address,
deployment_id,
time,
service_time,
longitude,
Expand All @@ -46,7 +47,7 @@ func (r repo) RetrieveAll(ctx context.Context, pm callhome.PageMetadata, filters
city
FROM telemetry
%s
ORDER BY ip_address, time DESC
ORDER BY COALESCE(deployment_id, ip_address), time DESC
),
limited_ips AS (
SELECT *
Expand All @@ -56,11 +57,11 @@ func (r repo) RetrieveAll(ctx context.Context, pm callhome.PageMetadata, filters
),
services_per_ip AS (
SELECT
t.ip_address,
COALESCE(t.deployment_id, t.ip_address) as id,
ARRAY_AGG(DISTINCT t.service) as services
FROM limited_ips lpi
INNER JOIN telemetry t ON t.ip_address = lpi.ip_address
GROUP BY t.ip_address
INNER JOIN telemetry t ON COALESCE(t.deployment_id, t.ip_address) = COALESCE(lpi.deployment_id, lpi.ip_address)
GROUP BY COALESCE(t.deployment_id, t.ip_address)
)
SELECT
lpi.ip_address,
Expand All @@ -73,7 +74,7 @@ func (r repo) RetrieveAll(ctx context.Context, pm callhome.PageMetadata, filters
lpi.city,
s.services
FROM limited_ips lpi
LEFT JOIN services_per_ip s ON lpi.ip_address = s.ip_address
LEFT JOIN services_per_ip s ON COALESCE(lpi.deployment_id, lpi.ip_address) = s.id
ORDER BY lpi.time DESC;
`, filterQuery)

Expand Down Expand Up @@ -105,9 +106,9 @@ func (r repo) RetrieveAll(ctx context.Context, pm callhome.PageMetadata, filters

// Save creates record in repo.
func (r repo) Save(ctx context.Context, t callhome.Telemetry) error {
q := `INSERT INTO telemetry (ip_address, mac_address, longitude, latitude,
q := `INSERT INTO telemetry (ip_address, mac_address, deployment_id, longitude, latitude,
mg_version, service, time, country, city, service_time)
VALUES (:ip_address, :mac_address, :longitude, :latitude,
VALUES (:ip_address, :mac_address, :deployment_id, :longitude, :latitude,
:mg_version, :service, :time, :country, :city, :service_time);`

tx, err := r.db.BeginTxx(ctx, nil)
Expand Down Expand Up @@ -147,7 +148,7 @@ func (r repo) RetrieveSummary(ctx context.Context, filters callhome.TelemetryFil
q := fmt.Sprintf(`
SELECT
country,
COUNT(DISTINCT ip_address) as number_of_deployments,
COUNT(DISTINCT COALESCE(deployment_id, ip_address)) as number_of_deployments,
ARRAY_AGG(DISTINCT city) FILTER (WHERE city IS NOT NULL) as cities,
ARRAY_AGG(DISTINCT service) FILTER (WHERE service IS NOT NULL) as services,
ARRAY_AGG(DISTINCT mg_version) FILTER (WHERE mg_version IS NOT NULL) as versions
Expand Down