CardoneNetworking.com
 

Run BGP on a Linux Box

This script runs on a linux machine and will establish a BGP session with a router.

        
#!/usr/bin/python3
import socket
import ipaddress
import struct
import threading
from time import sleep
from tkinter import *


#variables
hold_time = 90
version = 4
opts = b''
auth = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
old_routes = []
peers = {}
close = 1


#GUI stuff
myWindow = Tk()
myWindow.geometry('380x600')
myWindow.title('BGP Wizard')

peerIP_label = Label(myWindow,text='Peer IP')
peerIP_label.pack()
peerIP_label.config(font=('verdana', 12))

peerIP = Entry(myWindow)
peerIP.pack(ipady=3)

myIP_label = Label(myWindow,text='My IP')
myIP_label.pack()
myIP_label.config(font=('verdana', 12))

myIP = Entry(myWindow)
myIP.pack(ipady=3)

myAS_label = Label(myWindow,text='my AS Number')
myAS_label.pack()
myAS_label.config(font=('verdana', 12))

myAS = Entry(myWindow)
myAS.pack(ipady=3)

myASPath_label = Label(myWindow,text='AS Path to Advertise')
myASPath_label.pack()
myASPath_label.config(font=('verdana', 12))

myASPath = Entry(myWindow)
myASPath.pack(ipady=3)

localPref_label = Label(myWindow,text='Local Preference')
localPref_label.pack()
localPref_label.config(font=('verdana', 12))

localPref = Entry(myWindow)
localPref.pack(ipady=3)

routes_label = Label(myWindow,text='Routes to Advertise')
routes_label.pack()
routes_label.config(font=('verdana', 12))

text_box = Text(myWindow, height=12,width=30,)
text_box.pack()


#functions
def close_window():
    #this function is for when the user closes the window
    global close
    close = 0
    myWindow.destroy()

def packSubnet(network):
    #this function is for the network advertisement
    data = b''
    temp_data = b''
    binNetwork = struct.pack("!I", int(ipaddress.IPv4Network(network)[0]))
    for x in struct.iter_unpack("B", binNetwork):
        if x[0]:
            data += temp_data #incase 0 bits in the middle
            data += struct.pack("B", x[0])
        else:
            temp_data += struct.pack("B", x[0])
    return data

def keepalive():
    #this thread is for sending keepalives
    global bgp_socket
    while close:
        data = auth + struct.pack("!Hb", 19, 4)
        bgp_socket.send(data)
        print("keepalive sent")
        sleep(30)

def recBGPupdates():
    #this thread is for BGP message communication
    global bgp_socket
    global peers
    global close
    while close:
        auth = bgp_socket.recv(16)
        length = struct.unpack("!H", bgp_socket.recv(2))
        data = bgp_socket.recv(length[0] - 18)
        (type, ) = struct.unpack("b", data[:1]) #get bgp message type
        if type == 1:
            #open
            print("received an open message")
            (bgp_version, peer_as, hold_time, peer_router_id, opts_length) = struct.unpack("!b2HIb", data[1:11])
        elif type == 2:
            #update
            (withdraw_len, attr_len) = (struct.unpack("!2H", data[1:5]))
        elif type == 3:
            #notification
            (major_err, minor_err) = (struct.unpack("2b", data[1:3]))
            print("notification received")
            close = 0
            break
        elif type == 4:
            #keepalive
            print("keepalive received")

def handleBGP():
    #this function is for establishing the bgp session
    global bgp_socket
    global close
    peer_ip = peerIP.get()
    my_as = int(myAS.get())
    my_ip = myIP.get()
    close = 1
    router_id = int(ipaddress.ip_address(my_ip))
    if(ipaddress.ip_address(my_ip) < ipaddress.ip_address(peer_ip)):
        #we initiate
        mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        mysocket.bind((my_ip, 179))
        mysocket.listen(1)
        (bgp_socket, client_ip) = mysocket.accept()
    else:
        #we are the client
        bgp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        bgp_socket.connect((peer_ip, 179))
        client_ip = peer_ip
    
    updates_thread = threading.Thread(target=recBGPupdates)
    updates_thread.start()

    data = struct.pack("!bbHHI", 1, version, my_as, hold_time, router_id)
    opts_length = len(opts)
    data = data + struct.pack("b", opts_length) + opts
    length = len(data) + 18
    data = auth + struct.pack("!H", length) + data
    bgp_socket.send(data)

    keepalive_thread = threading.Thread(target=keepalive)
    keepalive_thread.start()

def advertiseRoutes():
    #this function is for advertising/withdrawing routes
    global old_routes
    global bgp_socket
    next_hop = myIP.get()
    as_path = myASPath.get().split()
    local_pref = int(localPref.get())
    new_routes = text_box.get('1.0', 'end').splitlines()
    withdrawn_routes = set(old_routes) - set(new_routes)
    old_routes = new_routes
    data = b'\x02'
    with_data = b''
    if(withdrawn_routes):
        for prefix in withdrawn_routes:
            if(ipaddress.ip_network(prefix)):
                line = prefix.split('/')
                with_data += struct.pack("b", int(line[1]))
                with_data += packSubnet(prefix)
    data += struct.pack("!H", len(with_data))
    data += with_data
    adv_data = b''
    as_data = b'\x02' #set AS segment type. used later
    adv_data += b'\x40\x01\x01\x00' #origin IGP
    adv_data += b'\x40\x02' #AS path
    as_data += struct.pack("b", len(as_path))
    for asn in as_path:
        as_data += struct.pack("!H", int(asn))
    adv_data += struct.pack("b", len(as_data)) + as_data
    adv_data += b'\x40\x03\x04' #next hop #ipv4
    adv_data += struct.pack("!I", int(ipaddress.ip_address(next_hop))) #for ipv4 only
    adv_data += b'\x40\x05\x04' #local pref
    adv_data += struct.pack("!I", local_pref)
    data += struct.pack("!H", len(adv_data))
    data += adv_data
    for prefix in new_routes:
        if(ipaddress.ip_network(prefix)):
            line = prefix.split('/')
            data += struct.pack("b", int(line[1]))
            subnet = packSubnet(prefix)
            data += subnet
    data = auth + struct.pack("!H", len(data) + 18) + data
    bgp_socket.send(data)
    print("update sent")


start_session = Button(myWindow,text='Establish BGP', command=handleBGP)
start_session.pack()

send_update = Button(myWindow,text='Send Update', command=advertiseRoutes)
send_update.pack()

myWindow.protocol("WM_DELETE_WINDOW", close_window)
myWindow.mainloop()