Zero-Cost Threat Hunting with Elastic Stack

Setting up a Zero Cost Threat Hunting Platform with Elastic Stack and Alienvault Reputation List

Elastic Stack is an awesome suit of products used for several analysis activities since its inception utilizing its amazing searching and visualization capabilities. Here we are trying to leverage the Elastic Stack with few other components to a threat hunting platform to build a reliable blacklist block containing malicious IPs obtained from OSINT and analyze the network traffic in real-time against this for any malicious traffic to any of these IPs.  To achieve this we are using FIREHOL, a service provided by OSINT. It analyses security IP Feeds, mainly related to online attacks, on-line service abuse, malware, botnets and other cybercrime activities. It has several lists. In our case we are going to choose the alienvault_reputation list.

http://iplists.firehol.org/

Components Used

Elasticsearch

Elasticsearch will act as our log repository. It’s incredibly powerful and versitile, and when coupled with Logstash for log ingestion and Kibana for visualization, provides a robust platform for all types of data.

Logstash

Logstash is mainly made up of three parts Input, filter and output. The input section is where we define the source of the logging data. The filter section could be used for parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch or any other analytics engines. The output section defines where the data processed by Logstash is stored. This can be ElasticSearch, Kafka or any other database options. Please refer the output filter documentation for supported database options.

Kibana

Kibana is an open source data visualization dashboard for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster. Users can create bar, line and scatter plots, or pie charts and maps on top of large volumes of data

ElastAlert

ElastAlert is an open source project started by the engineers at Yelp to provide an alerting mechanism for Elasticsearch. It’s an independent project that doesn’t need to run on the same server. It simply queries Elasticsearch through the REST API and has numerous outputs to alert on a match. One of those outputs will feed the information into Slack.

Slack

Slack is a cloud-based proprietary instant messaging platform developed by Slack Technologies

Memcached

Free & open source, high-performance, distributed memory object caching system, generic in nature, but intended for use in speeding up dynamic web applications by alleviating database load. Memcached is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of database calls, API calls, or page rendering.

Memcached is simple yet powerful. Its simple design promotes quick deployment, ease of development, and solves many problems facing large data caches. Its API is available for most popular languages.

Cron

The software utility cron is a time-based job scheduler in Unix-like computer operating systems. Users that set up and maintain software environments use cron to schedule jobs to run periodically at fixed times, dates, or intervals

Pymemcache

A comprehensive, fast, pure-Python memcached client library.

https://pymemcache.readthedocs.io/en/latest/getting_started.html

from pymemcache.client.base importClient

client = Client(('localhost', 11211))
client.set('some_key', 'some_value')
result = client.get('some_key')

Log collection

The Logstash input filter is configured to receive the logs from Firewall which is configured in syslog format

input {

syslog {
  port => 5066
  type => "fwlog"
   }
  }

Like most of the NextGEN Firewalls, in our scenario the Firewall is producing the logs in Key=Pair format.

device="MWE" date=2020-01-16 time=09:45:10 timezone="+06" device_name="MT610" device_id=F65278RCD6FHG69 log_id=050901616001 log_type="Content Filtering" log_component="HTTP" log_subtype="Allowed" status="" priority=Information fw_rule_id=636 user_name="" user_gp="" iap=12 category="Search Engines" category_type="Acceptable" url="https://clients4.google.com/" contenttype="" override_token="" httpresponsecode="" src_ip=192.168.22.16 dst_ip=172.217.19.174 protocol="TCP" src_port=55927 dst_port=443 sent_bytes=3295 recv_bytes=5825 domain=clients4.google.com exceptions="" activityname="" reason="" user_agent="" status_code="200" transactionid="" referer="" download_file_name="" download_file_type="" upload_file_name="" upload_file_type="" con_id=3915484168 application="" app_is_cloud=0 override_name="" override_authorizer=""

We use kv filter and CIDR filter to process to RAW logs to a searchable form

Logstash – KV Filter

This filter helps automatically parse messages (or specific event fields) which are of the foo=bar variety.

For example, if you have a log message which contains ip=1.2.3.4 error=REFUSED, you can parse those automatically by configuring:

    filter {
      kv { }
    }

The above will result in a message of ip=1.2.3.4 error=REFUSED having the fields:

  • ip: 1.2.3.4
  • error: REFUSED

Code

        {
      mutate {
      gsub => [
#     replace all "= " with double quotes to truly indicate no value
      "message", "= ", '="" '
      ]
    }
    kv {
      id => "sophos_kv"
      source => "message"
      trim_key => " "
      trim_value => " "
      value_split => "="
      field_split => " "
    }

Logstash CIDR Filter

The CIDR filter is for checking IP addresses in events against a list of network blocks that might contain it. Multiple addresses can be checked against multiple networks, any match succeeds. Upon success additional tags and/or fields can be added to the event.

Here the CIDR filter is used to add a tag to source and destination IPs to distinguish whether is it internal or external IP

Code

    #now check if source IP is a private IP, if so, tag it
    cidr {
      add_tag => [ "src_internalIP" ]
      address => [ "%{src_ip}" ]
      network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
    }

Getting the blacklist block

Since the IP list is dynamic we are fetching the list on a daily basis. Once the updated list is downloaded, it will be updated on the memcached cache. A Bash script is created and updated in the cron taks to fetch the IP list and update the memcached

Memached – Preparation

Memcached installation and configuration is simple.  It is also supported by ElasticSearch/Logstash which makes it perfect for our requirement. It also comes with the huge additional benefit of storing the data in memory, so lookups from Logstash to the data will be blazing fast.

The Memcached application is a very simple key-value store running in memory, you can telnet into the application running by default on port 11211.

The application is made up of only a few commands. The ones we are in need of here, are the “get” and “set” commands. Both of which are quite self explanatory….

https://memcached.org/

Installation

yum install memcached

Once it is installed start the service using

systemctl start memcached

Also systemctl enable memcached enables the service in the startup

systemctl enable memcached

Once the service is started test the connection using telnet

user@host$ telnet 127.0.0.1 11211

Trying 127.0.0.1...

Connected to 127.0.0.1.

Escape character is '^]'.

The set command will be used by our Python script, to set the data into the store.

The get command will be used by the Logstash filter plugin, to query the store for a specific IP and return the result back to Logstash.

 Python – Memcache integration

There are many modules available for python to interact with memcached. Here we use pymemcache which is simple yet powerful library for get / set operation of memcached.

Installation

pip install pymemcache

https://pymemcache.readthedocs.io/en/latest/getting_started.html

File –  /root/iptomemcache.py

from pymemcache.client.base import Client

client = Client(('127.0.0.1', 11211)) #Location of memached application

f = open("ip.txt", "r")
ips = f.read()
for ip in ips.splitlines()[30:]:

        client.set(ip,"Alienvault_IOC")

Shell script to fetch and update the memcached with IP blocks

File – /root/alien_vault.sh

#!/bin/bash
systemctl restart memcached
wget https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/alienvault_reputation.ipset -O /root/ip.txt
python /root/iptomemcache.py
{ echo "stats items"; sleep 1; } | telnet 127.0.0.1 11211

Cron job entry to run the script daily at 7.00 am

00 7 * * * user /etc/alien_vault.sh >> /var/log/alien_vault_update.log

Logstash – Memcached filter

https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html

The Memcached filter provides integration with external data in Memcached.

It currently provides the following facilities: – get: get values for one or more memcached keys and inject them into the event at the provided paths – set: set values from the event to the corresponding memcached keys

Installtion

/usr/share/logstash/bin/logstash-plugin install logstash-filter-memcached

Filter configuration

      # don't run memcached if it's internalIP,

      if "dst_internalIP" not in [tags] {

        memcached{

                hosts => ["127.0.0.1:11211"]

                get => {  "%{dst_ip}"   => "[ioc_ip]"}

                }

        if ![ioc_ip]

        {

                mutate

                {

                        add_field=> {"[ioc_ip]" => "None"}

                }

        }

      }

What it does here, once the IP is identified as an external IP, it will be searched in the memcached database. If the search is successful the Value of the Key=Value pair will be returned and stored in a separate field that is “ioc_ip”

Full code

if [type] in  ["fwlog"]
        {
      mutate {
      gsub => [
#     replace all "= " with double quotes to truly indicate no value
      "message", "= ", '="" '
      ]
    }
    kv {
      id => "sophos_kv"
      source => "message"
      trim_key => " "
      trim_value => " "
      value_split => "="
      field_split => " "
    }
    #now check if source IP is a private IP, if so, tag it
    cidr {
      add_tag => [ "src_internalIP" ]
      address => [ "%{src_ip}" ]
      network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
    }

    # don't run geoip if it's internalIP, otherwise find the GEOIP location
    if "src_internalIP" not in [tags] {
      geoip {
        add_tag => [ "src_geoip" ]
        source => "src_ip"
      }
    }
        else {
      #check DST IP now.  If it is a private IP, tag it

if [dst_ip] {
     cidr {
        add_tag => [ "dst_internalIP" ]
        address => [ "%{dst_ip}" ]
        network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
      }
}
      # don't run geoip if it's internalIP, otherwise find the GEOIP location
      if "dst_internalIP" not in [tags] {
        geoip {
          add_tag => [ "dst_geoip" ]
          source => "dst_ip"
        }

        memcached{
                hosts => ["127.0.0.1:11211"]
                get => {  "%{dst_ip}"   => "[ioc_ip]"}
                }
        if ![ioc_ip]
        {
                mutate
                {
                        add_field=> {"[ioc_ip]" => "None"}
                }
        }
      }
    }

  }

Now search the firewall log index for the additional field created for traffic to external IP. We require to refresh the field list to make the new field searchable

Indication of traffic to Blocked IP lists

Elastalert Rule Setup

Please follow : https://github.com/Yelp/elastalert for Elasticsearch installation

Custom Rule to send Slack Alerts

name: Threat Presence
type: any
index: fwlog-*
match_enhancements:
- "elastalert_modules.time_enhancement.TimeEnhancement"
filter:
- query:
   query_string:
     query: " ioc_ip.keyword :\"Alienvault_IOC\""
alert:
- "slack"
alert_text: |
    Alienvault Threat Activity!
    At {}, IP {} has the traffic to the malicious IP {}
    Message - {}
alert_text_args: ["local-timestamp", "src_ip"," dst_ip " ]
alert_text_type: alert_text_only
alert_subject: "Alienvault - Threat Event"
slack_webhook_url:
- "https://hooks.slack.com/services/TLE0VQ39B/BLER39S9/NmUOJJJVADSPOLKJ87xELyU"
slack_icon_url_override: https://avatars.slack-edge.com/2019-07-14/69sdfsdf5654_2as235751a4c7sbdf4_48.png

Slack Configuration

We are using Incoming-Web hooks to send data to the Slack channels

Incoming Webhooks are a simple way to post messages from apps into Slack. Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some options.

https://api.slack.com/messaging/webhooks

The web hooks URL generated is used in the Elastalert rule to send alerts to Slack channel when a traffic is originated from the internal network to any of the blacklisted IPs.