logs-analyzer/signoz/pkg/query-service/app/logs/v3/query_builder.go

572 lines
19 KiB
Go
Raw Permalink Normal View History

2024-09-02 22:47:30 +03:00
package v3
import (
"fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP05: 0.05,
v3.AggregateOperatorP10: 0.10,
v3.AggregateOperatorP20: 0.20,
v3.AggregateOperatorP25: 0.25,
v3.AggregateOperatorP50: 0.50,
v3.AggregateOperatorP75: 0.75,
v3.AggregateOperatorP90: 0.90,
v3.AggregateOperatorP95: 0.95,
v3.AggregateOperatorP99: 0.99,
}
var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
v3.AggregateOperatorAvg: "avg",
v3.AggregateOperatorMax: "max",
v3.AggregateOperatorMin: "min",
v3.AggregateOperatorSum: "sum",
v3.AggregateOperatorRateSum: "sum",
v3.AggregateOperatorRateAvg: "avg",
v3.AggregateOperatorRateMax: "max",
v3.AggregateOperatorRateMin: "min",
}
var logOperators = map[v3.FilterOperator]string{
v3.FilterOperatorEqual: "=",
v3.FilterOperatorNotEqual: "!=",
v3.FilterOperatorLessThan: "<",
v3.FilterOperatorLessThanOrEq: "<=",
v3.FilterOperatorGreaterThan: ">",
v3.FilterOperatorGreaterThanOrEq: ">=",
v3.FilterOperatorLike: "ILIKE",
v3.FilterOperatorNotLike: "NOT ILIKE",
v3.FilterOperatorContains: "ILIKE",
v3.FilterOperatorNotContains: "NOT ILIKE",
v3.FilterOperatorRegex: "match(%s, %s)",
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
v3.FilterOperatorIn: "IN",
v3.FilterOperatorNotIn: "NOT IN",
v3.FilterOperatorExists: "has(%s_%s_key, '%s')",
v3.FilterOperatorNotExists: "not has(%s_%s_key, '%s')",
}
const BODY = "body"
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
if columnType == v3.AttributeKeyTypeTag {
return "attributes"
}
return "resources"
}
func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string {
if columnDataType == v3.AttributeKeyDataTypeFloat64 {
return "float64"
}
if columnDataType == v3.AttributeKeyDataTypeInt64 {
return "int64"
}
if columnDataType == v3.AttributeKeyDataTypeBool {
return "bool"
}
return "string"
}
// getClickhouseColumnName returns the corresponding clickhouse column name for the given attribute/resource key
func getClickhouseColumnName(key v3.AttributeKey) string {
clickhouseColumn := key.Key
if key.Key == constants.TIMESTAMP || key.Key == "id" {
return key.Key
}
//if the key is present in the topLevelColumn then it will be only searched in those columns,
//regardless if it is indexed/present again in resource or column attribute
if !key.IsColumn {
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
return clickhouseColumn
}
// check if it is a static field
if key.Type == v3.AttributeKeyTypeUnspecified {
// name is the column name
return clickhouseColumn
}
// materialized column created from query
clickhouseColumn = utils.GetClickhouseColumnName(string(key.Type), string(key.DataType), key.Key)
return clickhouseColumn
}
// getSelectLabels returns the select labels for the query based on groupBy and aggregateOperator
func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
var selectLabels string
if aggregatorOperator == v3.AggregateOperatorNoOp {
selectLabels = ""
} else {
for _, tag := range groupBy {
columnName := getClickhouseColumnName(tag)
selectLabels += fmt.Sprintf(" %s as `%s`,", columnName, tag.Key)
}
}
return selectLabels
}
func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
var selectLabels []string
if aggregatorOperator == v3.AggregateOperatorNoOp {
return ""
} else {
for _, tag := range groupBy {
selectLabels = append(selectLabels, "`"+tag.Key+"`")
}
}
return strings.Join(selectLabels, ",")
}
func GetExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string {
if item.Key.Type == v3.AttributeKeyTypeUnspecified {
top := "!="
if op == v3.FilterOperatorNotExists {
top = "="
}
if val, ok := constants.StaticFieldsLogsV3[item.Key.Key]; ok {
// skip for timestamp and id
if val.Key == "" {
return ""
}
columnName := getClickhouseColumnName(item.Key)
if val.DataType == v3.AttributeKeyDataTypeString {
return fmt.Sprintf("%s %s ''", columnName, top)
} else {
// we just have two types, number and string
return fmt.Sprintf("%s %s 0", columnName, top)
}
}
} else if item.Key.IsColumn {
val := true
if op == v3.FilterOperatorNotExists {
val = false
}
return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseColumnName(item.Key), "`"), val)
}
columnType := getClickhouseLogsColumnType(item.Key.Type)
columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType)
return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key)
}
func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) {
var conditions []string
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
if item.Key.IsJSON {
filter, err := GetJSONFilter(item)
if err != nil {
return "", err
}
conditions = append(conditions, filter)
continue
}
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
var value interface{}
var err error
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType)
if err != nil {
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
}
}
if logsOp, ok := logOperators[op]; ok {
switch op {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
conditions = append(conditions, GetExistsNexistsFilter(op, item))
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
columnName := getClickhouseColumnName(item.Key)
fmtVal := utils.ClickHouseFormattedValue(value)
conditions = append(conditions, fmt.Sprintf(logsOp, columnName, fmtVal))
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
columnName := getClickhouseColumnName(item.Key)
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
if columnName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
conditions = append(conditions, fmt.Sprintf("lower(%s) %s lower('%%%s%%')", columnName, logsOp, val))
} else {
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, val))
}
default:
columnName := getClickhouseColumnName(item.Key)
fmtVal := utils.ClickHouseFormattedValue(value)
// for use lower for like and ilike
if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike {
if columnName == BODY {
logsOp = strings.Replace(logsOp, "ILIKE", "LIKE", 1) // removing i from ilike and not ilike
columnName = fmt.Sprintf("lower(%s)", columnName)
fmtVal = fmt.Sprintf("lower(%s)", fmtVal)
}
}
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal))
}
} else {
return "", fmt.Errorf("unsupported operator: %s", op)
}
}
}
// add group by conditions to filter out log lines which doesn't have the key
for _, attr := range groupBy {
if !attr.IsColumn {
columnType := getClickhouseLogsColumnType(attr.Type)
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key))
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
// for materialzied columns
conditions = append(conditions, fmt.Sprintf("%s_exists`=true", strings.TrimSuffix(getClickhouseColumnName(attr), "`")))
}
}
// add conditions for aggregate attribute
if aggregateAttribute.Key != "" {
existsFilter := GetExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute})
conditions = append(conditions, existsFilter)
}
queryString := strings.Join(conditions, " AND ")
return queryString, nil
}
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute)
if err != nil {
return "", err
}
if len(filterSubQuery) > 0 {
filterSubQuery = " AND " + filterSubQuery
}
// timerange will be sent in epoch millisecond
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", utils.GetEpochNanoSecs(start), utils.GetEpochNanoSecs(end))
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
having := having(mq.Having)
if having != "" {
having = " having " + having
}
var queryTmpl string
if graphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT"
} else if panelType == v3.PanelTypeTable {
queryTmpl =
"SELECT now() as ts,"
// step or aggregate interval is whole time period in case of table panel
step = (utils.GetEpochNanoSecs(end) - utils.GetEpochNanoSecs(start)) / 1000000000
} else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue {
// Select the aggregate value for interval
queryTmpl =
fmt.Sprintf("SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts,", step)
}
queryTmpl =
queryTmpl + selectLabels +
" %s as value " +
"from signoz_logs.distributed_logs " +
"where " + timeFilter + "%s" +
"%s%s" +
"%s"
// we dont need value for first query
// going with this route as for a cleaner approach on implementation
if graphLimitQtype == constants.FirstQueryGraphLimit {
queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
}
groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
if panelType != v3.PanelTypeList && groupBy != "" {
groupBy = " group by " + groupBy
}
orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy)
if panelType != v3.PanelTypeList && orderBy != "" {
orderBy = " order by " + orderBy
}
if graphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
}
aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey = getClickhouseColumnName(mq.AggregateAttribute)
}
switch mq.AggregateOperator {
case v3.AggregateOperatorRate:
rate := float64(step)
if preferRPM {
rate = rate / 60.0
}
op := fmt.Sprintf("count(%s)/%f", aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin:
rate := float64(step)
if preferRPM {
rate = rate / 60.0
}
op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCount:
op := "toFloat64(count(*))"
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorCountDistinct:
op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggregationKey)
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
return query, nil
case v3.AggregateOperatorNoOp:
queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s%s order by %s"
query := fmt.Sprintf(queryTmpl, timeFilter, filterSubQuery, orderBy)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator")
}
}
func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{})
if err != nil {
return "", err
}
switch mq.AggregateOperator {
case v3.AggregateOperatorNoOp:
query := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where "
if len(filterSubQuery) > 0 {
query = query + filterSubQuery + " AND "
}
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator in live tail")
}
}
// groupBy returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) string {
if (graphLimitQtype != constants.FirstQueryGraphLimit) && (panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue) {
tags = append(tags, "ts")
}
return strings.Join(tags, ",")
}
func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, "`"+tag.Key+"`")
}
return groupBy(panelType, graphLimitQtype, groupTags...)
}
// orderBy returns a string of comma separated tags for order by clause
// if there are remaining items which are not present in tags they are also added
// if the order is not specified, it defaults to ASC
func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string {
var orderBy []string
for _, item := range items {
if item.ColumnName == constants.SigNozOrderByValue {
orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order))
} else if _, ok := tagLookup[item.ColumnName]; ok {
orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order))
} else if panelType == v3.PanelTypeList {
attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn}
name := getClickhouseColumnName(attr)
if item.IsColumn {
name = "`" + name + "`"
}
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
}
}
return orderBy
}
func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string {
tagLookup := map[string]struct{}{}
for _, v := range tags {
tagLookup[v.Key] = struct{}{}
}
orderByArray := orderBy(panelType, items, tagLookup)
if len(orderByArray) == 0 {
if panelType == v3.PanelTypeList {
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC")
} else {
orderByArray = append(orderByArray, "value DESC")
}
}
str := strings.Join(orderByArray, ",")
return str
}
func having(items []v3.Having) string {
// aggregate something and filter on that aggregate
var having []string
for _, item := range items {
having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value)))
}
return strings.Join(having, " AND ")
}
func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
// the timestamp picked is not relevant here since the final value used is show the single
// chart with just the query value.
switch reduceTo {
case v3.ReduceToOperatorLast:
query = fmt.Sprintf("SELECT anyLast(value) as value, now() as ts FROM (%s)", query)
case v3.ReduceToOperatorSum:
query = fmt.Sprintf("SELECT sum(value) as value, now() as ts FROM (%s)", query)
case v3.ReduceToOperatorAvg:
query = fmt.Sprintf("SELECT avg(value) as value, now() as ts FROM (%s)", query)
case v3.ReduceToOperatorMax:
query = fmt.Sprintf("SELECT max(value) as value, now() as ts FROM (%s)", query)
case v3.ReduceToOperatorMin:
query = fmt.Sprintf("SELECT min(value) as value, now() as ts FROM (%s)", query)
default:
return "", fmt.Errorf("unsupported reduce operator")
}
return query, nil
}
func addLimitToQuery(query string, limit uint64) string {
if limit == 0 {
return query
}
return fmt.Sprintf("%s LIMIT %d", query, limit)
}
func addOffsetToQuery(query string, offset uint64) string {
return fmt.Sprintf("%s OFFSET %d", query, offset)
}
type Options struct {
GraphLimitQtype string
IsLivetailQuery bool
PreferRPM bool
}
func isOrderByTs(orderBy []v3.OrderBy) bool {
if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) {
return true
}
return false
}
// PrepareLogsQuery prepares the query for logs
// start and end are in epoch millisecond
// step is in seconds
func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) {
// adjust the start and end time to the step interval
// NOTE: Disabling this as it's creating confusion between charts and actual data
// if panelType != v3.PanelTypeList {
// start = start - (start % (mq.StepInterval * 1000))
// end = end - (end % (mq.StepInterval * 1000))
// }
if options.IsLivetailQuery {
query, err := buildLogsLiveTailQuery(mq)
if err != nil {
return "", err
}
return query, nil
} else if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
// give me just the groupby names
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
if err != nil {
return "", err
}
query = addLimitToQuery(query, mq.Limit)
return query, nil
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
if err != nil {
return "", err
}
return query, nil
}
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
if err != nil {
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
if panelType == v3.PanelTypeList {
// check if limit exceeded
if mq.Limit > 0 && mq.Offset >= mq.Limit {
return "", fmt.Errorf("max limit exceeded")
}
if mq.PageSize > 0 {
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
query = addLimitToQuery(query, mq.Limit-mq.Offset)
} else {
query = addLimitToQuery(query, mq.PageSize)
}
// add offset to the query only if it is not orderd by timestamp.
if !isOrderByTs(mq.OrderBy) {
query = addOffsetToQuery(query, mq.Offset)
}
} else {
query = addLimitToQuery(query, mq.Limit)
}
} else if panelType == v3.PanelTypeTable {
query = addLimitToQuery(query, mq.Limit)
}
return query, err
}