Module middleware

Expand source code
import json

from flask import Flask, jsonify, request
import numpy as np
from elasticsearch import Elasticsearch
from datetime import datetime
from tensorflow.keras import models


app = Flask(__name__)


def string_to_ascii(string):
    """

    Function that converts the domain name to an integer array of ASCII values.

    Args:
        string: Contains the Domain Name entered by the user.

    Returns:
        A numpy array of ASCII values corresponding to the characters of the
        Domain Name

    """
    ascii_arr = np.zeros(len(string))
    for i in range(len(string)):
        ascii_arr[i] = ord(string[i])
    return ascii_arr


def mal_and_benign_list_creation(es):
    """

    Function that creates an empty list for malicious and benign domains.

    Args:
        es: Contains the Elasticsearch object.

    Returns:
        Not applicable.

    """
    if ('mal' not in es.indices.get('*')) and \
            ('benign' not in es.indices.get('*')):
        es.index(index='mal', id=1, body={})
        es.index(index='benign', id=1, body={})


def vetted_list_creation(es):
    """

    Function that creates an empty list for not vetted, benign, honeypot and
    malicious domains.

    Args:
        es: Contains the Elasticsearch object.

    Returns:
        Not applicable.

    """
    if ('not_vetted' not in es.indices.get('*')) and \
            ('benign_vet' not in es.indices.get('*')) and \
            ('honeypot' not in es.indices.get('*')) and \
            ('blacklist' not in es.indices.get('*')):
        es.index(index='not_vetted', id=1, body={})
        es.index(index='benign_vet', id=1, body={})
        es.index(index='honeypot', id=1, body={})
        es.index(index='blacklist', id=1, body={})


def list_updation(es, domain_name, send):
    """

    Function that updates the updates the malicious and benign lists depending
    on how many people have queried that particular domain and also the model
    confidence score regarding whether the domain is malicious or not.

    Args:
        es: Contains the Elasticsearch object.
        domain_name: Contains the domain name corresponding to which the list
                     will be updated.
        send: Contains the model prediction score whether the domain is
              malicious or benign.

    Returns:
        Not Applicable

    """

    body_not_vetted = es.get(index='not_vetted', id=1)['_source']

    if float(send) < 0.5:
        body = es.get(index='benign', id=1)['_source']
        if domain_name in body.keys():
            body[domain_name]['count'] += 1
        else:
            body[domain_name] = {}
            body[domain_name]['count'] = 1
            body[domain_name]['status'] = \
                float(format(((1 - float(send)) * 100), '.2f'))

            if body[domain_name]['status'] < 90:
                body_not_vetted[domain_name] = {}
                body_not_vetted[domain_name]['class'] = 'Benign'
                body_not_vetted[domain_name]['acc'] = \
                    float(format(((1 - float(send)) * 100), '.2f'))

                update_body_not_vetted = \
                    {'doc': {domain_name: body_not_vetted[domain_name]}}
                es.update(index='not_vetted', id=1,
                          body=update_body_not_vetted)

        update_body = {'doc': {domain_name: body[domain_name]}}
        es.update(index='benign', id=1, body=update_body)

    else:
        body = es.get(index='mal', id=1)['_source']
        if domain_name in body.keys():
            body[domain_name]['count'] += 1
        else:
            body[domain_name] = {}
            body[domain_name]['count'] = 1
            body[domain_name]['status'] = \
                float(format(float(send) * 100, '.2f'))

            if body[domain_name]['status'] < 90:
                body_not_vetted[domain_name] = {}
                body_not_vetted[domain_name]['class'] = 'Malicious'
                body_not_vetted[domain_name]['acc'] = \
                    float(format(float(send) * 100, '.2f'))

                update_body_not_vetted = \
                    {'doc': {domain_name: body_not_vetted[domain_name]}}
                es.update(index='not_vetted', id=1,
                          body=update_body_not_vetted)

        update_body = {'doc': {domain_name: body[domain_name]}}
        es.update(index='mal', id=1, body=update_body)


def update_historical_analysis(es, domain_name, ip, send, date_time):
    """

    Function that updates the date and time at which a particular domain is
    queried along with the IP address of the machine that queries that
    particular domain. Moreover, the model confidence score regarding whether
    the domain is malicious or not is also updated.

    Args:
        es: Contains the Elasticsearch object.
        domain_name: Contains the domain name corresponding to which the list
                     will be updated.
        ip: Contains the IP address of the machine querying that domain.
        send: Contains the model prediction score whether the domain is
              malicious or benign.
        date_time: Contains the date as well as the time the request is
                   processed.

    Returns:
        Not applicable.

    """

    date = str(date_time.date())
    year = str(date_time.date().year)
    month = str(date_time.date().month)
    day = str(date_time.date().day)
    hour = str(date_time.time().hour)
    minutes = str(date_time.time().minute)

    if domain_name in es.indices.get('*'):
        body = es.get(index=domain_name, id=1)['_source']
        if date in body.keys():
            if hour in body[date].keys():
                if minutes in body[date][hour].keys():
                    body[date][hour][minutes] += 1
                else:
                    body[date][hour][minutes] = 1
            else:
                body[date][hour] = {minutes: 1}
        else:
            body[date] = {hour: {minutes: 1}}

        if year in body.keys():
            if month in body[year].keys():
                if day in body[year][month].keys():
                    body[year][month][day] += 1
                else:
                    body[year][month][day] = 1
            else:
                body[year][month] = {day: 1}
        else:
            body[year] = {month: {day: 1}}

        if ip in body['count'].keys():
            body['count'][ip] += 1
        else:
            body['count'][ip] = 1

        update_body = {
            'doc': {date: {hour: {minutes: body[date][hour][minutes]}},
                    year: {month: {day: body[year][month][day]}},
                    'count': body['count']}}
        es.update(index=domain_name, id=1, body=update_body)

    else:
        body = {date: {hour: {minutes: 1}}, year: {month: {day: 1}},
                'count': {ip: 1}, 'status': send}
        es.index(index=domain_name, id=1, body=body)


@app.route('/', methods=['GET', 'POST'])
def server():
    """

    Function that processes the request and then sends back a JSON message via
    the hyper text transfer protocol to the ML Bridge Plugin that contains the
    confidence score regarding whether the domain is malicious or benign.

    Returns:
        A JSON message that contains the confidence score regarding whether the
        request is of a malicious or a benign domain.

    """


    es = Elasticsearch()

    model = models.load_model(
        '../../../mlbridge-machine-learning/saved_models/dns_alert_model.hdf5')

    mal_and_benign_list_creation(es)
    vetted_list_creation(es)

    if request.method == 'POST':

        domain_json = request.get_json()
        key = list(domain_json.keys())
        domain_name = domain_json[key[0]]
        domain_name = domain_name.split('www.')

        if len(domain_name) == 1:
            domain_name = domain_name[0]
        else:
            domain_name = domain_name[1]

        ip = domain_json[key[1]]
        date_time = datetime.now()

        input_ = np.zeros(256)
        input_[0:len(domain_json[key[0]])] = string_to_ascii(domain_json[key[0]])
        input_ = np.reshape(input_, (1, 16, 16, 1))
        send = str(model.predict(input_)[0, 0])

        list_updation(es, domain_name, send)
        update_historical_analysis(es, domain_name, ip, send, date_time)

        return jsonify({'p': send})


if __name__ == '__main__':
    app.run(debug=True)

Functions

def list_updation(es, domain_name, send)

Function that updates the updates the malicious and benign lists depending on how many people have queried that particular domain and also the model confidence score regarding whether the domain is malicious or not.

Args

es
Contains the Elasticsearch object.
domain_name
Contains the domain name corresponding to which the list will be updated.
send
Contains the model prediction score whether the domain is malicious or benign.

Returns

Not Applicable

Expand source code
def list_updation(es, domain_name, send):
    """

    Function that updates the updates the malicious and benign lists depending
    on how many people have queried that particular domain and also the model
    confidence score regarding whether the domain is malicious or not.

    Args:
        es: Contains the Elasticsearch object.
        domain_name: Contains the domain name corresponding to which the list
                     will be updated.
        send: Contains the model prediction score whether the domain is
              malicious or benign.

    Returns:
        Not Applicable

    """

    body_not_vetted = es.get(index='not_vetted', id=1)['_source']

    if float(send) < 0.5:
        body = es.get(index='benign', id=1)['_source']
        if domain_name in body.keys():
            body[domain_name]['count'] += 1
        else:
            body[domain_name] = {}
            body[domain_name]['count'] = 1
            body[domain_name]['status'] = \
                float(format(((1 - float(send)) * 100), '.2f'))

            if body[domain_name]['status'] < 90:
                body_not_vetted[domain_name] = {}
                body_not_vetted[domain_name]['class'] = 'Benign'
                body_not_vetted[domain_name]['acc'] = \
                    float(format(((1 - float(send)) * 100), '.2f'))

                update_body_not_vetted = \
                    {'doc': {domain_name: body_not_vetted[domain_name]}}
                es.update(index='not_vetted', id=1,
                          body=update_body_not_vetted)

        update_body = {'doc': {domain_name: body[domain_name]}}
        es.update(index='benign', id=1, body=update_body)

    else:
        body = es.get(index='mal', id=1)['_source']
        if domain_name in body.keys():
            body[domain_name]['count'] += 1
        else:
            body[domain_name] = {}
            body[domain_name]['count'] = 1
            body[domain_name]['status'] = \
                float(format(float(send) * 100, '.2f'))

            if body[domain_name]['status'] < 90:
                body_not_vetted[domain_name] = {}
                body_not_vetted[domain_name]['class'] = 'Malicious'
                body_not_vetted[domain_name]['acc'] = \
                    float(format(float(send) * 100, '.2f'))

                update_body_not_vetted = \
                    {'doc': {domain_name: body_not_vetted[domain_name]}}
                es.update(index='not_vetted', id=1,
                          body=update_body_not_vetted)

        update_body = {'doc': {domain_name: body[domain_name]}}
        es.update(index='mal', id=1, body=update_body)
def mal_and_benign_list_creation(es)

Function that creates an empty list for malicious and benign domains.

Args

es
Contains the Elasticsearch object.

Returns

Not applicable.

Expand source code
def mal_and_benign_list_creation(es):
    """

    Function that creates an empty list for malicious and benign domains.

    Args:
        es: Contains the Elasticsearch object.

    Returns:
        Not applicable.

    """
    if ('mal' not in es.indices.get('*')) and \
            ('benign' not in es.indices.get('*')):
        es.index(index='mal', id=1, body={})
        es.index(index='benign', id=1, body={})
def server()

Function that processes the request and then sends back a JSON message via the hyper text transfer protocol to the ML Bridge Plugin that contains the confidence score regarding whether the domain is malicious or benign.

Returns

A JSON message that contains the confidence score regarding whether the request is of a malicious or a benign domain.

Expand source code
@app.route('/', methods=['GET', 'POST'])
def server():
    """

    Function that processes the request and then sends back a JSON message via
    the hyper text transfer protocol to the ML Bridge Plugin that contains the
    confidence score regarding whether the domain is malicious or benign.

    Returns:
        A JSON message that contains the confidence score regarding whether the
        request is of a malicious or a benign domain.

    """


    es = Elasticsearch()

    model = models.load_model(
        '../../../mlbridge-machine-learning/saved_models/dns_alert_model.hdf5')

    mal_and_benign_list_creation(es)
    vetted_list_creation(es)

    if request.method == 'POST':

        domain_json = request.get_json()
        key = list(domain_json.keys())
        domain_name = domain_json[key[0]]
        domain_name = domain_name.split('www.')

        if len(domain_name) == 1:
            domain_name = domain_name[0]
        else:
            domain_name = domain_name[1]

        ip = domain_json[key[1]]
        date_time = datetime.now()

        input_ = np.zeros(256)
        input_[0:len(domain_json[key[0]])] = string_to_ascii(domain_json[key[0]])
        input_ = np.reshape(input_, (1, 16, 16, 1))
        send = str(model.predict(input_)[0, 0])

        list_updation(es, domain_name, send)
        update_historical_analysis(es, domain_name, ip, send, date_time)

        return jsonify({'p': send})
def string_to_ascii(string)

Function that converts the domain name to an integer array of ASCII values.

Args

string
Contains the Domain Name entered by the user.

Returns

A numpy array of ASCII values corresponding to the characters of the Domain Name

Expand source code
def string_to_ascii(string):
    """

    Function that converts the domain name to an integer array of ASCII values.

    Args:
        string: Contains the Domain Name entered by the user.

    Returns:
        A numpy array of ASCII values corresponding to the characters of the
        Domain Name

    """
    ascii_arr = np.zeros(len(string))
    for i in range(len(string)):
        ascii_arr[i] = ord(string[i])
    return ascii_arr
def update_historical_analysis(es, domain_name, ip, send, date_time)

Function that updates the date and time at which a particular domain is queried along with the IP address of the machine that queries that particular domain. Moreover, the model confidence score regarding whether the domain is malicious or not is also updated.

Args

es
Contains the Elasticsearch object.
domain_name
Contains the domain name corresponding to which the list will be updated.
ip
Contains the IP address of the machine querying that domain.
send
Contains the model prediction score whether the domain is malicious or benign.
date_time
Contains the date as well as the time the request is processed.

Returns

Not applicable.

Expand source code
def update_historical_analysis(es, domain_name, ip, send, date_time):
    """

    Function that updates the date and time at which a particular domain is
    queried along with the IP address of the machine that queries that
    particular domain. Moreover, the model confidence score regarding whether
    the domain is malicious or not is also updated.

    Args:
        es: Contains the Elasticsearch object.
        domain_name: Contains the domain name corresponding to which the list
                     will be updated.
        ip: Contains the IP address of the machine querying that domain.
        send: Contains the model prediction score whether the domain is
              malicious or benign.
        date_time: Contains the date as well as the time the request is
                   processed.

    Returns:
        Not applicable.

    """

    date = str(date_time.date())
    year = str(date_time.date().year)
    month = str(date_time.date().month)
    day = str(date_time.date().day)
    hour = str(date_time.time().hour)
    minutes = str(date_time.time().minute)

    if domain_name in es.indices.get('*'):
        body = es.get(index=domain_name, id=1)['_source']
        if date in body.keys():
            if hour in body[date].keys():
                if minutes in body[date][hour].keys():
                    body[date][hour][minutes] += 1
                else:
                    body[date][hour][minutes] = 1
            else:
                body[date][hour] = {minutes: 1}
        else:
            body[date] = {hour: {minutes: 1}}

        if year in body.keys():
            if month in body[year].keys():
                if day in body[year][month].keys():
                    body[year][month][day] += 1
                else:
                    body[year][month][day] = 1
            else:
                body[year][month] = {day: 1}
        else:
            body[year] = {month: {day: 1}}

        if ip in body['count'].keys():
            body['count'][ip] += 1
        else:
            body['count'][ip] = 1

        update_body = {
            'doc': {date: {hour: {minutes: body[date][hour][minutes]}},
                    year: {month: {day: body[year][month][day]}},
                    'count': body['count']}}
        es.update(index=domain_name, id=1, body=update_body)

    else:
        body = {date: {hour: {minutes: 1}}, year: {month: {day: 1}},
                'count': {ip: 1}, 'status': send}
        es.index(index=domain_name, id=1, body=body)
def vetted_list_creation(es)

Function that creates an empty list for not vetted, benign, honeypot and malicious domains.

Args

es
Contains the Elasticsearch object.

Returns

Not applicable.

Expand source code
def vetted_list_creation(es):
    """

    Function that creates an empty list for not vetted, benign, honeypot and
    malicious domains.

    Args:
        es: Contains the Elasticsearch object.

    Returns:
        Not applicable.

    """
    if ('not_vetted' not in es.indices.get('*')) and \
            ('benign_vet' not in es.indices.get('*')) and \
            ('honeypot' not in es.indices.get('*')) and \
            ('blacklist' not in es.indices.get('*')):
        es.index(index='not_vetted', id=1, body={})
        es.index(index='benign_vet', id=1, body={})
        es.index(index='honeypot', id=1, body={})
        es.index(index='blacklist', id=1, body={})