logs-analyzer/signoz/pkg/query-service/app/opamp/mocks.go

148 lines
3.8 KiB
Go
Raw Permalink Normal View History

2024-09-02 22:47:30 +03:00
package opamp
import (
"context"
"log"
"net"
"github.com/google/uuid"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/pkg/errors"
)
type MockOpAmpConnection struct {
ServerToAgentMsgs []*protobufs.ServerToAgent
}
func (conn *MockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error {
conn.ServerToAgentMsgs = append(conn.ServerToAgentMsgs, msg)
return nil
}
func (conn *MockOpAmpConnection) LatestMsgFromServer() *protobufs.ServerToAgent {
if len(conn.ServerToAgentMsgs) < 1 {
return nil
}
return conn.ServerToAgentMsgs[len(conn.ServerToAgentMsgs)-1]
}
func (conn *MockOpAmpConnection) ClearMsgsFromServer() []*protobufs.ServerToAgent {
msgs := conn.ServerToAgentMsgs
conn.ServerToAgentMsgs = []*protobufs.ServerToAgent{}
return msgs
}
func (conn *MockOpAmpConnection) Disconnect() error {
return nil
}
func (conn *MockOpAmpConnection) RemoteAddr() net.Addr {
return nil
}
// Implements opamp.AgentConfigProvider
type MockAgentConfigProvider struct {
// An updated config is recommended by TestAgentConfProvider
// if `ZPagesEndpoint` is not empty
ZPagesEndpoint string
ConfigUpdateSubscribers map[string]func()
// { configId: { agentId: isOk } }
ReportedDeploymentStatuses map[string]map[string]bool
}
func NewMockAgentConfigProvider() *MockAgentConfigProvider {
return &MockAgentConfigProvider{
ConfigUpdateSubscribers: map[string]func(){},
ReportedDeploymentStatuses: map[string]map[string]bool{},
}
}
// Test helper.
func (ta *MockAgentConfigProvider) HasRecommendations() bool {
return len(ta.ZPagesEndpoint) > 0
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) (
[]byte, string, error,
) {
if len(ta.ZPagesEndpoint) < 1 {
return baseConfYaml, "agent-base-config", nil
}
k := koanf.New(".")
err := k.Load(rawbytes.Provider(baseConfYaml), yaml.Parser())
if err != nil {
return nil, "", errors.Wrap(err, "could not unmarshal baseConf")
}
k.Set("extensions.zpages.endpoint", ta.ZPagesEndpoint)
recommendedYaml, err := k.Marshal(yaml.Parser())
if err != nil {
return nil, "", errors.Wrap(err, "could not marshal recommended conf")
}
confId := ta.ZPagesEndpoint
return recommendedYaml, confId, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus(
agentId string,
configId string,
err error,
) {
confIdReports := ta.ReportedDeploymentStatuses[configId]
if confIdReports == nil {
confIdReports = map[string]bool{}
ta.ReportedDeploymentStatuses[configId] = confIdReports
}
confIdReports[agentId] = (err == nil)
}
// Test helper.
func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(
configId string, agentId string,
) bool {
confIdReports := ta.ReportedDeploymentStatuses[configId]
if confIdReports == nil {
return false
}
_, exists := confIdReports[agentId]
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()
ta.ConfigUpdateSubscribers[subscriberId] = callback
return func() {
delete(ta.ConfigUpdateSubscribers, subscriberId)
}
}
// test helper.
func (ta *MockAgentConfigProvider) NotifySubscribersOfChange() {
for _, callback := range ta.ConfigUpdateSubscribers {
callback()
}
}
// Brought in from https://github.com/open-telemetry/opamp-go/blob/main/internal/testhelpers/nethelpers.go
func GetAvailableLocalAddress() string {
ln, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
log.Fatalf("failed to get a free local port: %v", err)
}
// There is a possible race if something else takes this same port before
// the test uses it, however, that is unlikely in practice.
defer ln.Close()
return ln.Addr().String()
}