Source code for vent.menus.ntap

import ast
import npyscreen

from vent.api.actions import Action


[docs]class CreateNTap(npyscreen.ActionForm): """ For creating a new network tap container """
[docs] def create(self): self.add_handlers({"^T": self.quit, "^Q": self.quit}) self.add(npyscreen.Textfield, value='Create a network tap that calls tcpdump and records ' 'based on the parameters given ', editable=False, color="STANDOUT") self.add(npyscreen.Textfield, value='via a POST request ' 'to the url of the core network tap tool. ', editable=False, color="STANDOUT") self.add(npyscreen.Textfield, value='An example payload: ', editable=False, color="STANDOUT") self.add(npyscreen.Textfield, value=' {"nic": "eth0", "id": "testId", "interval": "60" ' '"filter": "", "iters": "1"} ', editable=False, color="STANDOUT") self.nextrely += 1 self.nic = self.add(npyscreen.TitleText, name='nic') self.id = self.add(npyscreen.TitleText, name='id') self.interval = self.add(npyscreen.TitleText, name='interval') self.filter = self.add(npyscreen.TitleText, name='filter') self.iters = self.add(npyscreen.TitleText, name='iters')
[docs] def on_ok(self): # error check to make sure all fields were filled out if not self.nic.value or not self.id.value or not self.interval.value \ or not self.iters.value: npyscreen.notify_confirm("Please fill out all fields", form_color='CAUTION') return # create a dictionary with user entered data payload = {} payload[self.nic.name] = self.nic.value payload[self.id.name] = self.id.value payload[self.interval.name] = self.interval.value payload[self.filter.name] = self.filter.value payload[self.iters.name] = self.iters.value # create an action object and have it do the work self.api_action = Action() try: url = self.api_action.get_vent_tool_url('network-tap')[1] + \ '/create' request = self.api_action.post_request(url, str(payload)) if request[0]: npyscreen.notify_confirm("Success: " + str(request[1])) self.quit() else: npyscreen.notify_confirm("Failure: " + str(request[1])) except Exception as e: # pragma: no cover npyscreen.notify_confirm("Failure: " + str(e)) return
[docs] def quit(self, *args, **kwargs): """ Overriden to switch back to MAIN form """ self.parentApp.switchForm("MAIN")
[docs] def on_cancel(self): """ When user cancels, return to MAIN """ self.quit()
[docs]class NICsNTap(npyscreen.ActionForm): """ For listing all available network interfaces """
[docs] def create(self): self.add_handlers({"^T": self.quit, "^Q": self.quit}) self.add(npyscreen.Textfield, value='List all avilable network interfaces', editable=False, color="STANDOUT") self.nextrely += 1 try: self.api_action = Action() url = self.api_action.get_vent_tool_url('network-tap')[1] + '/nics' request = self.api_action.get_request(url) if request[0]: box = self.add(npyscreen.BoxTitle, name="Available Network Interfaces", max_height=40) request = ast.literal_eval(str(request[1])) data = [d for d in request[1].split("\n")] box.values = data else: npyscreen.notify_confirm("Failure: " + request[1]) except Exception as e: # pragma no cover npyscreen.notify_confirm("Failure: " + str(e))
[docs] def quit(self, *args, **kwargs): """ Overriden to switch back to MAIN form """ self.parentApp.switchForm("MAIN")
[docs] def on_cancel(self): """ When user cancels, return to MAIN """ self.quit()
[docs] def on_ok(self): self.quit()
[docs]class ListNTap(npyscreen.ActionForm): """ For listing all network tap capture containers """
[docs] def create(self): self.add_handlers({"^T": self.quit, "^Q": self.quit}) self.add(npyscreen.Textfield, value='List all network tap capture containers', editable=False, color="STANDOUT") self.nextrely += 1 try: self.api_action = Action() url = self.api_action.get_vent_tool_url('network-tap')[1] + '/list' request = self.api_action.get_request(url) if request[0]: box = self.add(npyscreen.BoxTitle, name="Network Tap Capture Containers", max_height=40) request = ast.literal_eval(str(request[1])) data = [d for d in list(request[1])] box.values = data else: npyscreen.notify_confirm("Failure: " + request[1]) except Exception as e: # pragma no cover npyscreen.notify_confirm("Failure: " + str(e))
[docs] def quit(self, *args, **kwargs): """ Overriden to switch back to MAIN form """ self.parentApp.switchForm("MAIN")
[docs] def on_cancel(self): """ When user cancels, return to MAIN """ self.quit()
[docs] def on_ok(self): self.quit()
[docs]class ActionNTap(npyscreen.ActionForm): """ Base class to inherit from. """ def __init__(self, n_action=None, *args, **kwargs): self.n_action = n_action super(ActionNTap, self).__init__(*args, **kwargs)
[docs] def create(self): self.add_handlers({"^T": self.quit, "^Q": self.quit}) self.add(npyscreen.Textfield, value=self.n_action + ' a network tap capture container.', editable=False, color="STANDOUT") self.add(npyscreen.Textfield, value='Choose a container to ' + self.n_action, editable=False, color="STANDOUT") self.nextrely += 1 try: self.api_action = Action() # display all containers by sending a get request to ntap/list # nlist returns tuple and get_request returns tuple url = self.api_action.get_vent_tool_url('network-tap')[1] + '/list' request = self.api_action.get_request(url) # create selection for containers if request[0]: request = ast.literal_eval(str(request[1])) data = [d for d in list(request[1])] self.ms = self.add(npyscreen.TitleMultiSelect, max_height=20, name='Choose one or more containers to ' + self.n_action, values=data) else: npyscreen.notify_confirm("Failure: " + str(request[1])) except Exception as e: # pragma: no cover npyscreen.notify_confirm("Failure: " + str(e))
[docs] def on_ok(self): # error check to make sure at least one box was selected if not self.ms.value: npyscreen.notify_confirm("Please select at least one container.", form_color='CAUTION') # format the data into something ncontrol likes else: payload = {'id': list(x['id'] for x in self.ms.get_selected_objects())} # grab the url that network-tap is listening to try: npyscreen.notify_wait("Please wait. Currently working") self.api_action = Action() url = self.api_action.get_vent_tool_url('network-tap')[1] + "/" \ + self.n_action request = self.api_action.post_request(url, payload) if request[0]: npyscreen.notify_confirm("Success: " + str(request[1])) self.quit() else: npyscreen.notify_confirm("Failure: " + str(request[1])) except Exception as e: # pragma: no cover npyscreen.notify_confirm("Failure: " + str(e))
[docs] def quit(self, *args, **kwargs): """ Overriden to switch back to MAIN form """ self.parentApp.switchForm("MAIN")
[docs] def on_cancel(self): """ When user cancels, return to MAIN """ self.quit()
[docs]class DeleteNTap(ActionNTap): """ Delete inheritance """ def __init__(self, *args, **kwargs): ActionNTap.__init__(self, 'delete')
[docs]class StartNTap(ActionNTap): """ Delete inheritance """ def __init__(self, *args, **kwargs): ActionNTap.__init__(self, 'start')
[docs]class StopNTap(ActionNTap): """ Delete inheritance """ def __init__(self, *args, **kwargs): ActionNTap.__init__(self, 'stop')