logs-analyzer/signoz/pkg/query-service/agentConf/db.go

280 lines
6.8 KiB
Go
Raw Permalink Normal View History

2024-09-02 22:47:30 +03:00
package agentConf
import (
"context"
"database/sql"
"fmt"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf/sqlite"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
// Repo handles DDL and DML ops on ingestion rules
type Repo struct {
db *sqlx.DB
}
func (r *Repo) initDB(engine string) error {
switch engine {
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
}
func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
var c []ConfigVersion
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT
version,
id,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions AS v
WHERE element_type = $1
ORDER BY created_at desc, version desc
limit %v`, limit),
typ)
if err != nil {
return nil, model.InternalError(err)
}
incompleteStatuses := []DeployStatus{DeployInitiated, Deploying}
for idx := 1; idx < len(c); idx++ {
if slices.Contains(incompleteStatuses, c[idx].DeployStatus) {
c[idx].DeployStatus = DeployStatusUnknown
}
}
return c, nil
}
func (r *Repo) GetConfigVersion(
ctx context.Context, typ ElementTypeDef, v int,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions v
WHERE element_type = $1
AND version = $2`, typ, v)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
return nil, model.InternalError(err)
}
return &c, nil
}
func (r *Repo) GetLatestVersion(
ctx context.Context, typ ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT NAME FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result
FROM agent_config_versions AS v
WHERE element_type = $1
AND version = (
SELECT MAX(version)
FROM agent_config_versions
WHERE element_type=$2)`, typ, typ)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
return nil, model.InternalError(err)
}
return &c, nil
}
func (r *Repo) insertConfig(
ctx context.Context, userId string, c *ConfigVersion, elements []string,
) (fnerr *model.ApiError) {
if string(c.ElementType) == "" {
return model.BadRequest(fmt.Errorf(
"element type is required for creating agent config version",
))
}
// allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType)))
return model.BadRequest(fmt.Errorf("config must have atleast one element"))
}
if c.Version != 0 {
// the version can not be set by the user, we want to auto-assign the versions
// in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned
zap.L().Error("invalid version assignment while inserting agent config", zap.Int("version", c.Version), zap.String("ElementType", string(c.ElementType)))
return model.BadRequest(fmt.Errorf(
"user defined versions are not supported in the agent config",
))
}
configVersion, err := r.GetLatestVersion(ctx, c.ElementType)
if err != nil && err.Type() != model.ErrorNotFound {
zap.L().Error("failed to fetch latest config version", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to fetch latest config version"))
}
if configVersion != nil {
c.Version = updateVersion(configVersion.Version)
} else {
// first version
c.Version = 1
}
defer func() {
if fnerr != nil {
// remove all the damage (invalid rows from db)
r.db.Exec("DELETE FROM agent_config_versions WHERE id = $1", c.ID)
r.db.Exec("DELETE FROM agent_config_elements WHERE version_id=$1", c.ID)
}
}()
// insert config
configQuery := `INSERT INTO agent_config_versions(
id,
version,
created_by,
element_type,
active,
is_valid,
disabled,
deploy_status,
deploy_result)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
_, dbErr := r.db.ExecContext(ctx,
configQuery,
c.ID,
c.Version,
userId,
c.ElementType,
false,
false,
false,
c.DeployStatus,
c.DeployResult)
if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
}
elementsQuery := `INSERT INTO agent_config_elements(
id,
version_id,
element_type,
element_id)
VALUES ($1, $2, $3, $4)`
for _, e := range elements {
_, dbErr = r.db.ExecContext(
ctx,
elementsQuery,
uuid.NewString(),
c.ID,
c.ElementType,
e,
)
if dbErr != nil {
return model.InternalError(dbErr)
}
}
return nil
}
func (r *Repo) updateDeployStatus(ctx context.Context,
elementType ElementTypeDef,
version int,
status string,
result string,
lastHash string,
lastconf string) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2,
last_hash = COALESCE($3, last_hash),
last_config = $4
WHERE version=$5
AND element_type = $6`
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType))
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
}
return nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, confighash string, status string, result string,
) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2
WHERE last_hash=$4`
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.InternalError(errors.Wrap(err, "failed to update deploy status"))
}
return nil
}