437 lines
14 KiB
Go
437 lines
14 KiB
Go
package queryBuilder
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/SigNoz/govaluate"
|
|
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
|
"go.signoz.io/signoz/pkg/query-service/cache"
|
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
|
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var SupportedFunctions = []string{
|
|
"exp",
|
|
"log",
|
|
"ln",
|
|
"exp2",
|
|
"log2",
|
|
"exp10",
|
|
"log10",
|
|
"sqrt",
|
|
"cbrt",
|
|
"erf",
|
|
"erfc",
|
|
"lgamma",
|
|
"tgamma",
|
|
"sin",
|
|
"cos",
|
|
"tan",
|
|
"asin",
|
|
"acos",
|
|
"atan",
|
|
"degrees",
|
|
"radians",
|
|
"now",
|
|
"toUnixTimestamp",
|
|
}
|
|
|
|
var EvalFuncs = map[string]govaluate.ExpressionFunction{}
|
|
|
|
type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, keys map[string]v3.AttributeKey, options tracesV3.Options) (string, error)
|
|
type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options logsV3.Options) (string, error)
|
|
type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options metricsV3.Options) (string, error)
|
|
|
|
type QueryBuilder struct {
|
|
options QueryBuilderOptions
|
|
featureFlags interfaces.FeatureLookup
|
|
}
|
|
|
|
type QueryBuilderOptions struct {
|
|
BuildTraceQuery prepareTracesQueryFunc
|
|
BuildLogQuery prepareLogsQueryFunc
|
|
BuildMetricQuery prepareMetricQueryFunc
|
|
}
|
|
|
|
func NewQueryBuilder(options QueryBuilderOptions, featureFlags interfaces.FeatureLookup) *QueryBuilder {
|
|
return &QueryBuilder{
|
|
options: options,
|
|
featureFlags: featureFlags,
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
for _, fn := range SupportedFunctions {
|
|
EvalFuncs[fn] = func(args ...interface{}) (interface{}, error) {
|
|
return nil, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// unique returns the unique values in the slice
|
|
func unique(slice []string) []string {
|
|
keys := make(map[string]struct{})
|
|
list := []string{}
|
|
for _, entry := range slice {
|
|
if _, value := keys[entry]; !value {
|
|
keys[entry] = struct{}{}
|
|
list = append(list, entry)
|
|
}
|
|
}
|
|
return list
|
|
}
|
|
|
|
// expressionToQuery constructs the query for the expression
|
|
func expressionToQuery(
|
|
qp *v3.QueryRangeParamsV3,
|
|
varToQuery map[string]string,
|
|
expression *govaluate.EvaluableExpression,
|
|
queryName string,
|
|
) (string, error) {
|
|
var formulaQuery string
|
|
variables := unique(expression.Vars())
|
|
|
|
var modified []govaluate.ExpressionToken
|
|
tokens := expression.Tokens()
|
|
for idx := range tokens {
|
|
token := tokens[idx]
|
|
if token.Kind == govaluate.VARIABLE {
|
|
token.Value = fmt.Sprintf("%v.value", token.Value)
|
|
token.Meta = fmt.Sprintf("%v.value", token.Meta)
|
|
}
|
|
modified = append(modified, token)
|
|
}
|
|
// err should be nil here since the expression is already validated
|
|
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
|
|
|
|
var formulaSubQuery string
|
|
var joinUsing string
|
|
var prevVar string
|
|
for idx, variable := range variables {
|
|
query := varToQuery[variable]
|
|
groupTags := []string{}
|
|
for _, tag := range qp.CompositeQuery.BuilderQueries[variable].GroupBy {
|
|
groupTags = append(groupTags, tag.Key)
|
|
}
|
|
groupTags = append(groupTags, "ts")
|
|
if joinUsing == "" {
|
|
for _, tag := range groupTags {
|
|
joinUsing += fmt.Sprintf("%s.`%s` as `%s`, ", variable, tag, tag)
|
|
}
|
|
joinUsing = strings.TrimSuffix(joinUsing, ", ")
|
|
}
|
|
formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, variable)
|
|
if idx > 0 {
|
|
formulaSubQuery += " ON "
|
|
for _, tag := range groupTags {
|
|
formulaSubQuery += fmt.Sprintf("%s.`%s` = %s.`%s` AND ", prevVar, tag, variable, tag)
|
|
}
|
|
formulaSubQuery = strings.TrimSuffix(formulaSubQuery, " AND ")
|
|
}
|
|
if idx < len(variables)-1 {
|
|
formulaSubQuery += " INNER JOIN "
|
|
}
|
|
prevVar = variable
|
|
}
|
|
formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery
|
|
if len(qp.CompositeQuery.BuilderQueries[queryName].Having) > 0 {
|
|
conditions := []string{}
|
|
for _, having := range qp.CompositeQuery.BuilderQueries[queryName].Having {
|
|
conditions = append(conditions, fmt.Sprintf("%s %s %v", "value", having.Operator, having.Value))
|
|
}
|
|
havingClause := " HAVING " + strings.Join(conditions, " AND ")
|
|
formulaQuery += havingClause
|
|
}
|
|
return formulaQuery, nil
|
|
}
|
|
|
|
func (qb *QueryBuilder) PrepareLiveTailQuery(params *v3.QueryRangeParamsV3) (string, error) {
|
|
var queryStr string
|
|
var err error
|
|
compositeQuery := params.CompositeQuery
|
|
|
|
if compositeQuery != nil {
|
|
// There can only be a signle query and there is no concept of disabling queries
|
|
if len(compositeQuery.BuilderQueries) != 1 {
|
|
return "", fmt.Errorf("live tail is only supported for single query")
|
|
}
|
|
for queryName, query := range compositeQuery.BuilderQueries {
|
|
if query.Expression == queryName {
|
|
queryStr, err = qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{IsLivetailQuery: true})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
return queryStr, nil
|
|
}
|
|
|
|
func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...interface{}) (map[string]string, error) {
|
|
queries := make(map[string]string)
|
|
|
|
compositeQuery := params.CompositeQuery
|
|
|
|
if compositeQuery != nil {
|
|
err := qb.featureFlags.CheckFeature(constants.PreferRPM)
|
|
PreferRPMFeatureEnabled := err == nil
|
|
// Build queries for each builder query
|
|
for queryName, query := range compositeQuery.BuilderQueries {
|
|
// making a local clone since we should not update the global params if there is sift by
|
|
start := params.Start
|
|
end := params.End
|
|
if query.ShiftBy != 0 {
|
|
start = start - query.ShiftBy*1000
|
|
end = end - query.ShiftBy*1000
|
|
}
|
|
if query.Expression == queryName {
|
|
switch query.DataSource {
|
|
case v3.DataSourceTraces:
|
|
keys := map[string]v3.AttributeKey{}
|
|
if len(args) > 0 {
|
|
keys = args[0].(map[string]v3.AttributeKey)
|
|
}
|
|
// for ts query with group by and limit form two queries
|
|
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
|
|
limitQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType, query,
|
|
keys, tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
placeholderQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
|
|
query, keys, tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
query := fmt.Sprintf(placeholderQuery, limitQuery)
|
|
queries[queryName] = query
|
|
} else {
|
|
queryString, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
|
|
query, keys, tracesV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queries[queryName] = queryString
|
|
}
|
|
case v3.DataSourceLogs:
|
|
// for ts query with limit replace it as it is already formed
|
|
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
|
|
limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
query := fmt.Sprintf(placeholderQuery, limitQuery)
|
|
queries[queryName] = query
|
|
} else {
|
|
queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queries[queryName] = queryString
|
|
}
|
|
case v3.DataSourceMetrics:
|
|
queryString, err := qb.options.BuildMetricQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queries[queryName] = queryString
|
|
default:
|
|
zap.L().Error("Unknown data source", zap.String("dataSource", string(query.DataSource)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build queries for each expression
|
|
for _, query := range compositeQuery.BuilderQueries {
|
|
if query.Expression != query.QueryName {
|
|
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queryString, err := expressionToQuery(params, queries, expression, query.QueryName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queries[query.QueryName] = queryString
|
|
}
|
|
}
|
|
}
|
|
|
|
// filter out disabled queries
|
|
for queryName := range queries {
|
|
if compositeQuery.BuilderQueries[queryName].Disabled {
|
|
delete(queries, queryName)
|
|
}
|
|
}
|
|
return queries, nil
|
|
}
|
|
|
|
// cacheKeyGenerator implements the cache.KeyGenerator interface
|
|
type cacheKeyGenerator struct {
|
|
}
|
|
|
|
func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string]string) string {
|
|
|
|
var modified []govaluate.ExpressionToken
|
|
tokens := expression.Tokens()
|
|
for idx := range tokens {
|
|
token := tokens[idx]
|
|
if token.Kind == govaluate.VARIABLE {
|
|
token.Value = keys[fmt.Sprintf("%s", token.Value)]
|
|
token.Meta = keys[fmt.Sprintf("%s", token.Meta)]
|
|
}
|
|
modified = append(modified, token)
|
|
}
|
|
// err should be nil here since the expression is already validated
|
|
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
|
|
return formula.ExpressionString()
|
|
}
|
|
|
|
func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool {
|
|
variables := unique(expression.Vars())
|
|
for _, variable := range variables {
|
|
if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceMetrics {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isLogExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool {
|
|
variables := unique(expression.Vars())
|
|
for _, variable := range variables {
|
|
if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceLogs {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string {
|
|
keys := make(map[string]string)
|
|
|
|
// For non-graph panels, we don't support caching
|
|
if params.CompositeQuery.PanelType != v3.PanelTypeGraph {
|
|
return keys
|
|
}
|
|
|
|
// Use query as the cache key for PromQL queries
|
|
if params.CompositeQuery.QueryType == v3.QueryTypePromQL {
|
|
for name, query := range params.CompositeQuery.PromQueries {
|
|
keys[name] = query.Query
|
|
}
|
|
return keys
|
|
}
|
|
|
|
// Build keys for each builder query
|
|
for queryName, query := range params.CompositeQuery.BuilderQueries {
|
|
if query.Expression == queryName && query.DataSource == v3.DataSourceLogs {
|
|
var parts []string
|
|
|
|
// We need to build uniqe cache query for BuilderQuery
|
|
parts = append(parts, fmt.Sprintf("source=%s", query.DataSource))
|
|
parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval))
|
|
parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator))
|
|
parts = append(parts, fmt.Sprintf("limit=%d", query.Limit))
|
|
|
|
if query.AggregateAttribute.Key != "" {
|
|
parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey()))
|
|
}
|
|
|
|
if query.Filters != nil && len(query.Filters.Items) > 0 {
|
|
for idx, filter := range query.Filters.Items {
|
|
parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey()))
|
|
}
|
|
}
|
|
|
|
if len(query.GroupBy) > 0 {
|
|
for idx, groupBy := range query.GroupBy {
|
|
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
|
|
}
|
|
}
|
|
|
|
if len(query.OrderBy) > 0 {
|
|
for idx, orderBy := range query.OrderBy {
|
|
parts = append(parts, fmt.Sprintf("orderBy-%d=%s", idx, orderBy.CacheKey()))
|
|
}
|
|
}
|
|
|
|
if len(query.Having) > 0 {
|
|
for idx, having := range query.Having {
|
|
parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey()))
|
|
}
|
|
}
|
|
|
|
key := strings.Join(parts, "&")
|
|
keys[queryName] = key
|
|
} else if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics {
|
|
var parts []string
|
|
|
|
// We need to build uniqe cache query for BuilderQuery
|
|
|
|
parts = append(parts, fmt.Sprintf("source=%s", query.DataSource))
|
|
parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval))
|
|
parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator))
|
|
parts = append(parts, fmt.Sprintf("timeAggregation=%s", query.TimeAggregation))
|
|
parts = append(parts, fmt.Sprintf("spaceAggregation=%s", query.SpaceAggregation))
|
|
|
|
if query.AggregateAttribute.Key != "" {
|
|
parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey()))
|
|
}
|
|
|
|
if query.Filters != nil && len(query.Filters.Items) > 0 {
|
|
for idx, filter := range query.Filters.Items {
|
|
parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey()))
|
|
}
|
|
}
|
|
|
|
if len(query.GroupBy) > 0 {
|
|
for idx, groupBy := range query.GroupBy {
|
|
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
|
|
}
|
|
}
|
|
|
|
if len(query.Having) > 0 {
|
|
for idx, having := range query.Having {
|
|
parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey()))
|
|
}
|
|
}
|
|
|
|
key := strings.Join(parts, "&")
|
|
keys[queryName] = key
|
|
}
|
|
}
|
|
|
|
// Build keys for each expression
|
|
for _, query := range params.CompositeQuery.BuilderQueries {
|
|
if query.Expression != query.QueryName {
|
|
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
|
|
|
|
if !isMetricExpression(expression, params) && !isLogExpression(expression, params) {
|
|
continue
|
|
}
|
|
|
|
expressionCacheKey := expressionToKey(expression, keys)
|
|
keys[query.QueryName] = expressionCacheKey
|
|
}
|
|
}
|
|
|
|
return keys
|
|
}
|
|
|
|
func NewKeyGenerator() cache.KeyGenerator {
|
|
return &cacheKeyGenerator{}
|
|
}
|