visit
It should have schema-free support. Raw logs are unstructured free texts and basically impossible to aggregate and calculate, so you need to turn them into structured tables (the process is called "ETL") before putting them into a database or data warehouse for analysis. If there was a schema change, lots of complicated adjustments needed to be put into ETL and the structured tables. Therefore, semi-structured logs, mostly in JSON format, emerged. You can add or delete fields in these logs and the log storage system will adjust the schema accordingly.
It should be low-cost. Logs are huge and they are generated continuously. A fairly big company produces 10~100 TBs of log data. For business or compliance reasons, it should keep the logs around for half a year or longer. That means to store a log size measured in PB, so the cost is considerable.
It should be capable of real-time processing. Logs should be written in real-time, otherwise, engineers won't be able to catch the latest events in troubleshooting and security tracking. Plus, a good log system should provide full-text searching capabilities and respond to interactive queries quickly.
A popular log processing solution within the data industry is the ELK stack: Elasticsearch, Logstash, and Kibana. The pipeline can be split into five modules:
Computation cost: In data writing, Elasticsearch also executes compute-intensive operations including inverted index creation, tokenization, and inverted index ranking. Under these circumstances, data is written into Elasticsearch at a speed of around 2MB/s per core. When CPU resources are tight, data writing requirements often get rejected during peak times, which further leads to higher latency.
Storage cost: To speed up retrieval, Elasticsearch stores the forward indexes, inverted indexes, and doc values of the original data, consuming a lot more storage space. The compression ratio of a single data copy is only 1.5:1, compared to the 5:1 in most log solutions.
During data writing peaks: Clusters are prone to overload during data writing peaks.
During queries: Since all queries are processed in the memory, big queries can easily lead to JVM OOM.
Slow recovery: For a cluster failure, Elasticsearch should reload indexes, which is resource-intensive, so it will take many minutes to recover. That challenges the service availability guarantee.
Increase writing: The performance of Elasticsearch is bottlenecked by data parsing and inverted index creation, so we improved Apache Doris in these factors: we quickened data parsing and index creation by SIMD instructions and CPU vector instructions; then we removed those data structures unnecessary for log analysis scenarios, such as forward indexes, to simplify index creation.
Reduce storage costs: We removed forward indexes, which represented 30% of index data. We adopted columnar storage and the ZSTD compression algorithm, and thus achieved a compression ratio of 5:1 to 10:1. Given that a large part of the historical logs are rarely accessed, we introduced tiered storage to separate hot and cold data. Logs that are older than a specified time period will be moved to object storage, which is much less expensive. This can reduce storage costs by around 70%.
Ingestion: Apache Doris supports various ingestion methods for log data. You can push logs to Doris via HTTP Output using Logstash, you can use Flink to pre-process the logs before you write them into Doris, or you can load logs from Flink or object storage to Doris via Routine Load and S3 Load.
Analysis: You can put log data in Doris and conduct join queries across logs and other data in the data warehouse.
Application: Apache Doris is compatible with MySQL protocol, so you can integrate a wide variety of data analytic tools and clients to Doris, such as Grafana and Tableau. You can also connect applications to Doris via JDBC and ODBC APIs. We are planning to build a Kibana-like system to visualize logs.
Moreover, Apache Doris has better scheme-free support and a more user-friendly analytic engine.
Firstly, we worked on the data types. We optimized the string search and regular expression matching for "text" through vectorization and brought a performance increase of 2~10 times. For JSON strings, Apache Doris will parse and store them as a more compacted and efficient binary format, which can speed up queries by 4 times. We also added a new data type for complicated data: Array Map. It can structuralize concatenated strings to allow for higher compression rate and faster queries.
Secondly, Apache Doris supports schema evolution. This means you can adjust the schema as your business changes. You can add or delete fields and indexes, and change the data types for fields.
-- Add a column. Result will be returned in milliseconds.
ALTER TABLE lineitem ADD COLUMN l_new_column INT;
-- Add inverted index. Doris will generate inverted index for all new data afterward.
ALTER TABLE table_name ADD INDEX index_name(column_name) USING INVERTED;
-- Build index for the specified historical data partitions.
BUILD INDEX index_name ON table_name PARTITIONS(partition_name1, partition_name2);
CREATE DATABASE log_db;
USE log_db;
CREATE RESOURCE "log_s3"
PROPERTIES
(
"type" = "s3",
"s3.endpoint" = "your_endpoint_url",
"s3.region" = "your_region",
"s3.bucket" = "your_bucket",
"s3.root.path" = "your_path",
"s3.access_key" = "your_ak",
"s3.secret_key" = "your_sk"
);
CREATE STORAGE POLICY log_policy_1day
PROPERTIES(
"storage_resource" = "log_s3",
"cooldown_ttl" = "86400"
);
CREATE TABLE log_table
(
`ts` DATETIMEV2,
`clientip` VARCHAR(20),
`request` TEXT,
`status` INT,
`size` INT,
INDEX idx_size (`size`) USING INVERTED,
INDEX idx_status (`status`) USING INVERTED,
INDEX idx_clientip (`clientip`) USING INVERTED,
INDEX idx_request (`request`) USING INVERTED PROPERTIES("parser" = "english")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_num" = "1",
"storage_policy" = "log_policy_1day",
"deprecated_dynamic_schema" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "AUTO",
"dynamic_partition.replication_num" = "1"
);
Ingest from Kafka
For JSON logs that are written into Kafka message queues, create a Routine Load so Doris will pull data from Kafka. The following is an example. The property.*
configurations are optional:
-- Prepare the Kafka cluster and topic ("log_topic")
-- Create Routine Load, load data from Kafka log_topic to "log_table"
CREATE ROUTINE LOAD load_log_kafka ON log_db.log_table
COLUMNS(ts, clientip, request, status, size)
PROPERTIES (
"max_batch_interval" = "10",
"max_batch_rows" = "1000000",
"max_batch_size" = "109715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA (
"kafka_broker_list" = "host:port",
"kafka_topic" = "log_topic",
"property.group.id" = "your_group_id",
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="GSSAPI",
"property.sasl.kerberos.service.name"="kafka",
"property.sasl.kerberos.keytab"="/path/to/xxx.keytab",
"property.sasl.kerberos.principal"="[email protected]"
);
You can check how the Routine Load runs via the SHOW ROUTINE LOAD
command.
Ingest via Logstash
Specify the batch size and batch delay in logstash.yml
to improve data writing performance.
pipeline.batch.size: 100000
pipeline.batch.delay: 10000
Add HTTP Output to the log collection configuration file testlog.conf
, URL => the Stream Load address in Doris.
Authorization in the headers is http basic auth
. It is computed with echo -n 'username:password' | base64
.
The load_to_single_tablet
in the headers can reduce the number of small files in data ingestion.
output {
http {
follow_redirects => true
keepalive => false
http_method => "put"
url => "//172.21.0.5:8640/api/logdb/logtable/_stream_load"
headers => [
"format", "json",
"strip_outer_array", "true",
"load_to_single_tablet", "true",
"Authorization", "Basic cm9vdDo=",
"Expect", "100-continue"
]
format => "json_batch"
}
}
Ingest via self-defined program
Use basic auth
for HTTP authorization, use echo -n 'username:password' | base64
in computation
http header "format:json"
: the data type is specified as JSON
http header "read_json_by_line:true"
: each line is a JSON record
http header "load_to_single_tablet:true"
: write to one tablet each time
curl \
--location-trusted \
-u username:password \
-H "format:json" \
-H "read_json_by_line:true" \
-H "load_to_single_tablet:true" \
-T logfile.json \
//fe_host:fe_http_port/api/log_db/log_table/_stream_load
mysql -h fe_host -P fe_mysql_port -u root -Dlog_db
A few common queries in log analysis:
SELECT * FROM log_table ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE clientip = '8.8.8.8' ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE request MATCH_ANY 'error 404' ORDER BY ts DESC LIMIT 10;
SELECT * FROM log_table WHERE request MATCH_ALL 'image faq' ORDER BY ts DESC LIMIT 10;