Source code for queryutils.csvparser
import csv
import dateutil.parser
import os
import splparser.parser
from user import *
from query import *
from logging import getLogger as get_logger
from os import path
from splparser.exceptions import SPLSyntaxError, TerminatingSPLSyntaxError
BYTES_IN_MB = 1048576
LIMIT = 2000*BYTES_IN_MB
logger = get_logger("queryutils")
[docs]def get_users_from_file(filename, users):
"""Populate the users dictionary with users and their queris from the given file.
:param filename: The .csv file containing user queries
:type filename: str
:param users: The user dict into which to place the users
:type users: dict
:rtype: None
"""
logger.debug("Reading from file:" + filename)
first = True
with open(filename) as datafile:
reader = csv.DictReader(datafile)
for row in reader:
logger.debug("Attempting to read row.")
# Get basic user information.
username = row.get('user', None)
if username is not None:
username = unicode(username.decode("utf-8"))
case = row.get('case_id', None)
if case is not None:
case = unicode(case.decode("utf-8"))
# Check if we've seen this user before.
user = None
userhash = None
if username is not None and case is not None:
userhash = ".".join([username, case])
user = users.get(userhash, None)
elif username is not None and case is None:
userhash = username
user = users.get(userhash, None)
else:
userhash = ""
user = users.get(userhash, None)
if user is None:
user = User(username)
users[userhash] = user
user.case_id = case
# Get basic query information.
timestamp = row.get('_time', None)
if timestamp is not None:
timestamp = float(dateutil.parser.parse(timestamp).strftime('%s.%f'))
querystring = row.get('search', None)
if querystring is not None:
querystring = unicode(querystring.decode("utf-8")).strip()
# Tie the query and the user together.
query = Query(querystring, timestamp)
user.queries.append(query)
query.user = user
# Get additional query information and add it to the query.
runtime = row.get('runtime', None)
if runtime is None:
runtime = row.get('total_run_time', None)
if runtime is not None:
try:
runtime = float(runtime.decode("utf-8"))
except:
runtime = None
query.execution_time = runtime
search_et = row.get('search_et', None)
if search_et is not None:
try:
search_et = float(search_et.decode("utf-8"))
except:
search_et = None
query.earliest_event = search_et
search_lt = row.get('search_lt', None)
if search_lt is not None:
try:
search_lt = float(search_lt.decode("utf-8"))
except:
search_lt = None
query.latest_event = search_lt
range = row.get('range', None)
if range is not None:
try:
range = float(range.decode("utf-8"))
except:
range = None
query.range = range
is_realtime = row.get('is_realtime', None)
if is_realtime is not None and is_realtime == "false":
is_realtime = False
if is_realtime is not None and is_realtime == "true":
is_realtime = True
query.is_realtime = is_realtime
searchtype = row.get('searchtype', None)
if searchtype is None:
searchtype = row.get('search_type', None)
if searchtype is not None:
searchtype = unicode(searchtype.decode("utf-8"))
query.search_type = searchtype
if query.search_type == "adhoc":
query.is_interactive = True
splunk_id = row.get('search_id', None)
if splunk_id is not None:
splunk_id = unicode(splunk_id.decode("utf-8"))
query.splunk_search_id = splunk_id
savedsearch_name = row.get('savedsearch_name', None)
if savedsearch_name is not None:
savedsearch_name = unicode(savedsearch_name.decode("utf-8"))
query.saved_search_name = savedsearch_name
logger.debug("Successfully read query.")
[docs]def get_users_from_directory(directory, users, limit=LIMIT):
"""Populate the users dict with users from the .csv files.
:param directory: The path to the directory containing the .csv files
:type directory: str
:param users: The dict to contain the users read from the .csv files
:type users: dict
:param limit: The approximate number of bytes to read in (for testing)
:type limit: int
:rtype: None
"""
raw_data_files = get_csv_files(directory, limit=limit)
for f in raw_data_files:
get_users_from_file(f, users)
[docs]def get_csv_files(dir, limit=LIMIT):
"""Return the paths to all the .csv files in the given directory.
:param dir: The path to the given directory
:type dir: str
:param limit: The approximate number of bytes to read in (for testing)
:type limit: int
:rtype: list
"""
csv_files = []
bytes_added = 0.
for (dirpath, dirnames, filenames) in os.walk(dir):
for filename in filenames:
if filename[-4:] == '.csv':
full_filename = path.join(path.abspath(dir), filename)
csv_files.append(full_filename)
bytes_added += path.getsize(full_filename)
if bytes_added > limit:
return csv_files
return csv_files