пятница, 14 июля 2023 г.

Сбор sFlow в Clickhouse для аналитики

Общая схема:

host-sflow -> sFlow -> goflow2 collector -> kafka protobuf -> clickhouse

1. Серверная часть сборки sflow

https://github.com/netsampler/goflow2
NetFlow/IPFIX/sFlow collector in Go


Kafka

Подготовить топик nflow

goflow2

Запущен в Docker

docker run --net=host -ti netsampler/goflow2:latest -transport=kafka -transport.kafka.brokers=localhost:9092 -transport.kafka.topic=nflow -format=pb -format.protobuf.fixedlen=true

 

Clickhouse

Скачать настройки протокола protobuf 

/var/lib/clickhouse/user_files/protocols.csv

Таблицы и подключение к kafka
CREATE TABLE IF NOT EXISTS nflow.flows_kafka
(
time_received UInt64,
time_flow_start UInt64,
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
src_addr FixedString(16),
dst_addr FixedString(16),
src_as UInt32,
dst_as UInt32,
etype UInt32,
proto UInt32,
src_port UInt32,
dst_port UInt32,
bytes UInt64,
packets UInt64
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '127.0.0.1:9092',
kafka_topic_list = 'nflow',
kafka_group_name = 'clickhouse',
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_skip_broken_messages = 1;
        
CREATE TABLE IF NOT EXISTS nflow.flows_raw
(
date Date,
datetime DateTime,
sequence_num UInt32,
sampling_rate UInt64,
sampler_address String,
src_addr String,
dst_addr String,
src_as UInt32,
dst_as UInt32,
etype UInt32,
proto UInt32,
src_port UInt32,
dst_port UInt32,
bytes UInt64,
packets UInt64
) ENGINE = MergeTree()
PARTITION BY date
ORDER BY datetime
TTL date + toIntervalDay(30)
SETTINGS index_granularity = 8192;

        
CREATE MATERIALIZED VIEW IF NOT EXISTS nflow.flows_raw_mv TO nflow.flows_raw
AS SELECT
toDate(time_received) AS date,
time_flow_start AS datetime,
sequence_num,
sampling_rate,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(sampler_address), 13, 4))) AS sampler_address,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(src_addr), 13, 4))) AS src_addr,
IPv4NumToString(reinterpretAsUInt32(substring(reverse(dst_addr), 13, 4))) AS dst_addr,
src_as,
dst_as,
etype,
proto,
src_port,
dst_port,
bytes,
packets        
FROM nflow.flows_kafka;


2. Установка сборщика флоу на хост

Используем host-sflow

cat /etc/hsflowd.conf
sflow {
  collector { ip=goflow2.host.example.com udpport=6343 }
   
  # ====== Local configuration ======
  pcap { dev = eth0 }
  pcap { dev = eth1 }
  # ...
}
 
systemctl enable hsflowd
systemctl start hsflowd 


3. Примеры запросов

Активные сборщики флоу

select sampler_address,count() from nflow.flows_raw where datetime>now()-24*3600 group by sampler_address limit 100;

Найти источник запросов на список внешних ресурсов

select src_addr,count() from nflow.flows_raw where datetime>now()-24*3600 and dst_addr in ('1.1.1.1','8.8.8.8') group by src_addr order by count() desc limit 100;