import os
import sys
import time
import uuid
from redis import Redis
from redis import StrictRedis
from rq import Queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
[docs]class GZHandler(PatternMatchingEventHandler):
"""
Handles when an event on the directory being watched happens that matches
the values in patterns
"""
patterns = ["*"]
# want to ignore certain pcap files from splitter as they contain junk
ignore_patters = ["*-miscellaneous*"]
# don't want to process files in on_modified for files that have already
# been created and processed
created_files = set()
try:
# let jobs run for up to one day
q = Queue(connection=Redis(host='redis'), default_timeout=86400)
r = StrictRedis(host='redis', port=6379, db=0)
except Exception as e: # pragma: no cover
print("Unable to connect to redis:", str(e))
[docs] def process(self, event):
"""
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
"""
uid = str(uuid.uuid4())
hostname = os.environ.get("VENT_HOST")
if not hostname:
hostname = ""
try:
# TODO should directories be treated as bulk paths to send to a
# plugin?
if not event.is_directory:
# wait for long copies to finish
historicalSize = -1
while (historicalSize != os.path.getsize(event.src_path)):
historicalSize = os.path.getsize(event.src_path)
time.sleep(0.1)
# check if the file was already queued and ignore
exists = False
print(uid+" started " + event.src_path)
jobs = self.r.keys(pattern="rq:job*")
for job in jobs:
print(uid + " ***")
description = self.r.hget(job, 'description')
print(uid + " " + description)
if description.startswith("watch.file_queue('"):
print(uid + " " +
description.split("watch.file_queue('" +
hostname + "_")[1][:-2])
print(uid + " " + event.src_path)
if description.split("watch.file_queue('" +
hostname +
"_")[1][:-2] == event.src_path:
print(uid + " true")
exists = True
elif description.startswith("watch.gpu_queue('"):
print(uid + " " +
description.split('"file": "')[1].split('"')[0])
print(uid + " " + event.src_path)
if description.split('"file": "')[1].split('"')[0] == event.src_path:
print(uid + " true")
exists = True
print(uid + " ***")
if not exists:
# !! TODO this should be a configuration option in the
# vent.template
print(uid + " let's queue it " + event.src_path)
# let jobs be queued for up to 30 days
self.q.enqueue('watch.file_queue',
hostname + "_" + event.src_path,
ttl=2592000)
print(uid + " end " + event.src_path)
except Exception as e: # pragma: no cover
print("file drop error: " + str(e))
[docs] def on_created(self, event):
self.created_files.add(event.src_path)
self.process(event)
[docs] def on_modified(self, event):
# don't perform any action if file was already created or file is
# deleted
if (event.src_path not in self.created_files and
os.path.exists(event.src_path)):
# add to created files because the file was moved into directory,
# which is what should be creating it, but some OS's treat it as a
# modification with docker mounts
self.created_files.add(event.src_path)
self.process(event)
if __name__ == '__main__': # pragma: no cover
args = None
if len(sys.argv) > 1:
args = sys.argv[1:]
observer = Observer()
observer.schedule(GZHandler(), path=args[0] if args else '/files',
recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt: # pragma: no cover
observer.stop()
observer.join()