logs-analyzer/signoz/pkg/query-service/app/logparsingpipeline/preview.go

231 lines
6.7 KiB
Go
Raw Permalink Normal View History

2024-09-02 22:47:30 +03:00
package logparsingpipeline
import (
"context"
"sort"
"strings"
"time"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator"
"go.signoz.io/signoz/pkg/query-service/model"
)
func SimulatePipelinesProcessing(
ctx context.Context,
pipelines []Pipeline,
logs []model.SignozLog,
) (
output []model.SignozLog, collectorWarnAndErrorLogs []string, apiErr *model.ApiError,
) {
if len(pipelines) < 1 {
return logs, nil, nil
}
// Collector simulation does not guarantee that logs will come
// out in the same order as in the input.
//
// Add a temp attribute for sorting logs in simulation output
inputOrderAttribute := "__signoz_input_idx__"
for i := 0; i < len(logs); i++ {
if logs[i].Attributes_int64 == nil {
logs[i].Attributes_int64 = map[string]int64{}
}
logs[i].Attributes_int64[inputOrderAttribute] = int64(i)
}
simulatorInputPLogs := SignozLogsToPLogs(logs)
processorFactories, err := processor.MakeFactoryMap(
logstransformprocessor.NewFactory(),
)
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(
err, "could not construct processor factory map",
))
}
// Pipelines translate to logtransformprocessors in otel collector config.
// Each logtransformprocessor (stanza) does its own batching with a flush
// interval of 100ms. So e2e processing time for logs grows linearly with
// the number of logtransformprocessors involved.
// See defaultFlushInterval at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/adapter/emitter.go
// TODO(Raj): Remove this after flushInterval is exposed in logtransformprocessor config
timeout := time.Millisecond * time.Duration(len(pipelines)*100+100)
configGenerator := func(baseConf []byte) ([]byte, error) {
updatedConf, apiErr := GenerateCollectorConfigWithPipelines(baseConf, pipelines)
if apiErr != nil {
return nil, apiErr.ToError()
}
return updatedConf, nil
}
outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing(
ctx,
processorFactories,
configGenerator,
simulatorInputPLogs,
timeout,
)
if apiErr != nil {
return nil, collectorErrs, model.WrapApiError(apiErr,
"could not simulate log pipelines processing.\nCollector errors",
)
}
outputSignozLogs := PLogsToSignozLogs(outputPLogs)
// Sort output logs by their order in the input and remove the temp ordering attribute
sort.Slice(outputSignozLogs, func(i, j int) bool {
iIdx := outputSignozLogs[i].Attributes_int64[inputOrderAttribute]
jIdx := outputSignozLogs[j].Attributes_int64[inputOrderAttribute]
return iIdx < jIdx
})
for _, sigLog := range outputSignozLogs {
delete(sigLog.Attributes_int64, inputOrderAttribute)
}
for _, log := range collectorErrs {
// if log is empty or log comes from featuregate.go, then remove it
if log == "" || strings.Contains(log, "featuregate.go") {
continue
}
collectorWarnAndErrorLogs = append(collectorWarnAndErrorLogs, log)
}
return outputSignozLogs, collectorWarnAndErrorLogs, nil
}
// plog doesn't contain an ID field.
// SignozLog.ID is stored as a log attribute in plogs for processing
// and gets hydrated back later.
const SignozLogIdAttr = "__signoz_log_id__"
func SignozLogsToPLogs(logs []model.SignozLog) []plog.Logs {
result := []plog.Logs{}
for _, log := range logs {
pl := plog.NewLogs()
rl := pl.ResourceLogs().AppendEmpty()
resourceAttribs := rl.Resource().Attributes()
for k, v := range log.Resources_string {
resourceAttribs.PutStr(k, v)
}
scopeLog := rl.ScopeLogs().AppendEmpty()
slRecord := scopeLog.LogRecords().AppendEmpty()
slRecord.SetTimestamp(pcommon.NewTimestampFromTime(
time.Unix(0, int64(log.Timestamp)),
))
var traceIdBuf [16]byte
copy(traceIdBuf[:], []byte(log.TraceID))
slRecord.SetTraceID(traceIdBuf)
var spanIdBuf [8]byte
copy(spanIdBuf[:], []byte(log.SpanID))
slRecord.SetSpanID(spanIdBuf)
slRecord.SetFlags(plog.LogRecordFlags(log.TraceFlags))
slRecord.SetSeverityText(log.SeverityText)
slRecord.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber))
slRecord.Body().SetStr(log.Body)
slAttribs := slRecord.Attributes()
for k, v := range log.Attributes_int64 {
slAttribs.PutInt(k, v)
}
for k, v := range log.Attributes_float64 {
slAttribs.PutDouble(k, v)
}
for k, v := range log.Attributes_string {
slAttribs.PutStr(k, v)
}
slAttribs.PutStr(SignozLogIdAttr, log.ID)
result = append(result, pl)
}
return result
}
func PLogsToSignozLogs(plogs []plog.Logs) []model.SignozLog {
result := []model.SignozLog{}
for _, pl := range plogs {
resourceLogsSlice := pl.ResourceLogs()
for i := 0; i < resourceLogsSlice.Len(); i++ {
rl := resourceLogsSlice.At(i)
scopeLogsSlice := rl.ScopeLogs()
for j := 0; j < scopeLogsSlice.Len(); j++ {
sl := scopeLogsSlice.At(j)
lrSlice := sl.LogRecords()
for k := 0; k < lrSlice.Len(); k++ {
lr := lrSlice.At(k)
// Recover ID for the log and remove temp attrib used for storing it
signozLogId := ""
logIdVal, exists := lr.Attributes().Get(SignozLogIdAttr)
if exists {
signozLogId = logIdVal.Str()
}
lr.Attributes().Remove(SignozLogIdAttr)
signozLog := model.SignozLog{
Timestamp: uint64(lr.Timestamp()),
ID: signozLogId,
TraceID: lr.TraceID().String(),
SpanID: lr.SpanID().String(),
TraceFlags: uint32(lr.Flags()),
SeverityText: lr.SeverityText(),
SeverityNumber: uint8(lr.SeverityNumber()),
Body: lr.Body().AsString(),
Resources_string: pMapToStrMap(rl.Resource().Attributes()),
Attributes_string: map[string]string{},
Attributes_int64: map[string]int64{},
Attributes_float64: map[string]float64{},
}
// Populate signozLog.Attributes_...
lr.Attributes().Range(func(k string, v pcommon.Value) bool {
if v.Type() == pcommon.ValueTypeDouble {
signozLog.Attributes_float64[k] = v.Double()
} else if v.Type() == pcommon.ValueTypeInt {
signozLog.Attributes_int64[k] = v.Int()
} else {
signozLog.Attributes_string[k] = v.AsString()
}
return true
})
result = append(result, signozLog)
}
}
}
}
return result
}
func pMapToStrMap(pMap pcommon.Map) map[string]string {
result := map[string]string{}
pMap.Range(func(k string, v pcommon.Value) bool {
result[k] = v.AsString()
return true
})
return result
}