Zero-Cost Threat Hunting with Elastic Stack
Setting up a Zero Cost Threat Hunting Platform with Elastic Stack and Alienvault Reputation List
Elastic Stack is an awesome suit of products used for several analysis activities since its inception utilizing its amazing searching and visualization capabilities. Here we are trying to leverage the Elastic Stack with few other components to a threat hunting platform to build a reliable blacklist block containing malicious IPs obtained from OSINT and analyze the network traffic in real-time against this for any malicious traffic to any of these IPs. To achieve this we are using FIREHOL, a service provided by OSINT. It analyses security IP Feeds, mainly related to online attacks, on-line service abuse, malware, botnets and other cybercrime activities. It has several lists. In our case we are going to choose the alienvault_reputation list.
Components Used
Elasticsearch
Elasticsearch will act as our log repository. It’s incredibly powerful and versitile, and when coupled with Logstash for log ingestion and Kibana for visualization, provides a robust platform for all types of data.
Logstash
Logstash is mainly made up of three parts Input, filter and output. The input section is where we define the source of the logging data. The filter section could be used for parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch or any other analytics engines. The output section defines where the data processed by Logstash is stored. This can be ElasticSearch, Kafka or any other database options. Please refer the output filter documentation for supported database options.
Kibana
Kibana is an open source data visualization dashboard for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster. Users can create bar, line and scatter plots, or pie charts and maps on top of large volumes of data
ElastAlert
ElastAlert is an open source project started by the engineers at Yelp to provide an alerting mechanism for Elasticsearch. It’s an independent project that doesn’t need to run on the same server. It simply queries Elasticsearch through the REST API and has numerous outputs to alert on a match. One of those outputs will feed the information into Slack.
Slack
Slack is a cloud-based proprietary instant messaging platform developed by Slack Technologies
Memcached
Free & open source, high-performance, distributed memory object caching system, generic in nature, but intended for use in speeding up dynamic web applications by alleviating database load. Memcached is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of database calls, API calls, or page rendering.
Memcached is simple yet powerful. Its simple design promotes quick deployment, ease of development, and solves many problems facing large data caches. Its API is available for most popular languages.
Cron
The software utility cron is a time-based job scheduler in Unix-like computer operating systems. Users that set up and maintain software environments use cron to schedule jobs to run periodically at fixed times, dates, or intervals
Pymemcache
A comprehensive, fast, pure-Python memcached client library.
https://pymemcache.readthedocs.io/en/latest/getting_started.html
from pymemcache.client.base importClient client = Client(('localhost', 11211)) client.set('some_key', 'some_value') result = client.get('some_key')
Log collection
The Logstash input filter is configured to receive the logs from Firewall which is configured in syslog format
input { syslog { port => 5066 type => "fwlog" } }
Like most of the NextGEN Firewalls, in our scenario the Firewall is producing the logs in Key=Pair format.
device="MWE" date=2020-01-16 time=09:45:10 timezone="+06" device_name="MT610" device_id=F65278RCD6FHG69 log_id=050901616001 log_type="Content Filtering" log_component="HTTP" log_subtype="Allowed" status="" priority=Information fw_rule_id=636 user_name="" user_gp="" iap=12 category="Search Engines" category_type="Acceptable" url="https://clients4.google.com/" contenttype="" override_token="" httpresponsecode="" src_ip=192.168.22.16 dst_ip=172.217.19.174 protocol="TCP" src_port=55927 dst_port=443 sent_bytes=3295 recv_bytes=5825 domain=clients4.google.com exceptions="" activityname="" reason="" user_agent="" status_code="200" transactionid="" referer="" download_file_name="" download_file_type="" upload_file_name="" upload_file_type="" con_id=3915484168 application="" app_is_cloud=0 override_name="" override_authorizer=""
We use kv filter and CIDR filter to process to RAW logs to a searchable form
Logstash – KV Filter
This filter helps automatically parse messages (or specific event fields) which are of the foo=bar variety.
For example, if you have a log message which contains ip=1.2.3.4 error=REFUSED, you can parse those automatically by configuring:
filter { kv { } }
The above will result in a message of ip=1.2.3.4 error=REFUSED having the fields:
- ip: 1.2.3.4
- error: REFUSED
Code
{ mutate { gsub => [ # replace all "= " with double quotes to truly indicate no value "message", "= ", '="" ' ] } kv { id => "sophos_kv" source => "message" trim_key => " " trim_value => " " value_split => "=" field_split => " " }
Logstash CIDR Filter
The CIDR filter is for checking IP addresses in events against a list of network blocks that might contain it. Multiple addresses can be checked against multiple networks, any match succeeds. Upon success additional tags and/or fields can be added to the event.
Here the CIDR filter is used to add a tag to source and destination IPs to distinguish whether is it internal or external IP
Code
#now check if source IP is a private IP, if so, tag it cidr { add_tag => [ "src_internalIP" ] address => [ "%{src_ip}" ] network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ] }
Getting the blacklist block
Since the IP list is dynamic we are fetching the list on a daily basis. Once the updated list is downloaded, it will be updated on the memcached cache. A Bash script is created and updated in the cron taks to fetch the IP list and update the memcached
Memached – Preparation
Memcached installation and configuration is simple. It is also supported by ElasticSearch/Logstash which makes it perfect for our requirement. It also comes with the huge additional benefit of storing the data in memory, so lookups from Logstash to the data will be blazing fast.
The Memcached application is a very simple key-value store running in memory, you can telnet into the application running by default on port 11211.
The application is made up of only a few commands. The ones we are in need of here, are the “get” and “set” commands. Both of which are quite self explanatory….
Installation
yum install memcached
Once it is installed start the service using
systemctl start memcached
Also systemctl enable memcached enables the service in the startup
systemctl enable memcached
Once the service is started test the connection using telnet
user@host$ telnet 127.0.0.1 11211 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'.
The set command will be used by our Python script, to set the data into the store.
The get command will be used by the Logstash filter plugin, to query the store for a specific IP and return the result back to Logstash.
Python – Memcache integration
There are many modules available for python to interact with memcached. Here we use pymemcache which is simple yet powerful library for get / set operation of memcached.
Installation
pip install pymemcache
https://pymemcache.readthedocs.io/en/latest/getting_started.html
File – /root/iptomemcache.py
from pymemcache.client.base import Client client = Client(('127.0.0.1', 11211)) #Location of memached application f = open("ip.txt", "r") ips = f.read() for ip in ips.splitlines()[30:]: client.set(ip,"Alienvault_IOC")
Shell script to fetch and update the memcached with IP blocks
File – /root/alien_vault.sh
#!/bin/bash systemctl restart memcached wget https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/alienvault_reputation.ipset -O /root/ip.txt python /root/iptomemcache.py { echo "stats items"; sleep 1; } | telnet 127.0.0.1 11211
Cron job entry to run the script daily at 7.00 am
00 7 * * * user /etc/alien_vault.sh >> /var/log/alien_vault_update.log
Logstash – Memcached filter
https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html
The Memcached filter provides integration with external data in Memcached.
It currently provides the following facilities: – get: get values for one or more memcached keys and inject them into the event at the provided paths – set: set values from the event to the corresponding memcached keys
Installtion
/usr/share/logstash/bin/logstash-plugin install logstash-filter-memcached
Filter configuration
# don't run memcached if it's internalIP, if "dst_internalIP" not in [tags] { memcached{ hosts => ["127.0.0.1:11211"] get => { "%{dst_ip}" => "[ioc_ip]"} } if ![ioc_ip] { mutate { add_field=> {"[ioc_ip]" => "None"} } } }
What it does here, once the IP is identified as an external IP, it will be searched in the memcached database. If the search is successful the Value of the Key=Value pair will be returned and stored in a separate field that is “ioc_ip”
Full code
if [type] in ["fwlog"] { mutate { gsub => [ # replace all "= " with double quotes to truly indicate no value "message", "= ", '="" ' ] } kv { id => "sophos_kv" source => "message" trim_key => " " trim_value => " " value_split => "=" field_split => " " } #now check if source IP is a private IP, if so, tag it cidr { add_tag => [ "src_internalIP" ] address => [ "%{src_ip}" ] network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ] } # don't run geoip if it's internalIP, otherwise find the GEOIP location if "src_internalIP" not in [tags] { geoip { add_tag => [ "src_geoip" ] source => "src_ip" } } else { #check DST IP now. If it is a private IP, tag it if [dst_ip] { cidr { add_tag => [ "dst_internalIP" ] address => [ "%{dst_ip}" ] network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ] } } # don't run geoip if it's internalIP, otherwise find the GEOIP location if "dst_internalIP" not in [tags] { geoip { add_tag => [ "dst_geoip" ] source => "dst_ip" } memcached{ hosts => ["127.0.0.1:11211"] get => { "%{dst_ip}" => "[ioc_ip]"} } if ![ioc_ip] { mutate { add_field=> {"[ioc_ip]" => "None"} } } } } }
Now search the firewall log index for the additional field created for traffic to external IP. We require to refresh the field list to make the new field searchable
Indication of traffic to Blocked IP lists
Elastalert Rule Setup
Please follow : https://github.com/Yelp/elastalert for Elasticsearch installation
Custom Rule to send Slack Alerts
name: Threat Presence type: any index: fwlog-* match_enhancements: - "elastalert_modules.time_enhancement.TimeEnhancement" filter: - query: query_string: query: " ioc_ip.keyword :\"Alienvault_IOC\"" alert: - "slack" alert_text: | Alienvault Threat Activity! At {}, IP {} has the traffic to the malicious IP {} Message - {} alert_text_args: ["local-timestamp", "src_ip"," dst_ip " ] alert_text_type: alert_text_only alert_subject: "Alienvault - Threat Event" slack_webhook_url: - "https://hooks.slack.com/services/TLE0VQ39B/BLER39S9/NmUOJJJVADSPOLKJ87xELyU" slack_icon_url_override: https://avatars.slack-edge.com/2019-07-14/69sdfsdf5654_2as235751a4c7sbdf4_48.png
Slack Configuration
We are using Incoming-Web hooks to send data to the Slack channels
Incoming Webhooks are a simple way to post messages from apps into Slack. Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some options.
https://api.slack.com/messaging/webhooks
The web hooks URL generated is used in the Elastalert rule to send alerts to Slack channel when a traffic is originated from the internal network to any of the blacklisted IPs.