Source code for intake_splunk.core


import base64
import io
import requests
import pandas as pd
import time
import warnings
from intake.source import base

# because Splunk connections are against a self-signed cert, all connections
# would raise a warning
from . import __version__
warnings.filterwarnings('ignore', module='urllib3.connectionpool')


[docs]class SplunkSource(base.DataSource): """Execute a query on Splunk Parameters ---------- query : str String to pass to Splunk for execution. If it does not start with "|" or "search", "search" will be prepended. url : str Endpoint on which to reach splunk, including protocol and port. auth : (str, str) or str Username/password to authenticate by. chunksize : int """ container = 'dataframe' version = __version__ name = 'splunk' partition_access = True def __init__(self, query, url, auth, chunksize=5000, metadata=None): self.url = url self.auth = auth self.query = query self.chunksize = chunksize self._df = None super(SplunkSource, self).__init__(metadata=metadata) def _get_schema(self): if self._df is None: # this waits until query is ready, but Splunk has results_preview # end-point which can be fetched while query is running self.splunk = SplunkConnect(self.url) if isinstance(self.auth, (tuple, list)): self.splunk.auth(*self.auth) else: self.splunk.auth_head(key=self.auth) self._df = self.splunk.read_dask(self.query, self.chunksize) self.npartitions = self._df.npartitions return base.Schema(datashape=None, dtype=self._df, shape=(None, len(self._df.columns)), npartitions=self.npartitions, extra_metadata={}) def _get_partition(self, i): return self._df.get_partition(i).compute()
[docs] def to_dask(self): self.discover() return self._df
class SplunkConnect: """ Talk to Splunk over REST, download data to dataframes If there is no pre-fetched auth key, must call ``.auth(user, pw)`` to establish credentials, or ``.auth_head(user=user, pw=pw)`` to use simple auth on all calls. Main user methods: read_pandas, read_pandas_iter, read_dask Parameters ---------- base_url: str Address to contact Splunk on, e.g., ``https://localhost:8089`` key: str Auth key, if known """ POLL_TIME = 1 # seconds to sleep between successive polls TIMEOUT = 600 # maximum seconds to wait for query to finish def __init__(self, base_url, key=None): self.url = base_url self.key = key self.head = None if key: self.auth_head(key) def auth(self, user, pw): """ Login to splunk and get a session key """ url = self.url + '/services/auth/login?output_mode=json' r = requests.post(url, verify=False, data={'username': user, 'password': pw}) self.key = r.json()['sessionKey'] self.auth_head(self.key) def auth_head(self, key=None, user=None, pw=None): """ Make header either by session key or by user/pass """ if key: self.head = {'Authorization': 'Splunk %s' % key} elif user is None and pw is None: raise ValueError('Must supply key or user/password') else: code = "%s:%s" % (user, pw) self.head = {'Authorization': 'Basic %s' % base64.b64encode( code.encode()).decode()} @staticmethod def _sanitize_query(q): """ Ensure that all queries are actually valid searches """ q = q.strip() if not q.startswith('search') and not q.startswith('|'): return "search " + q return q def list_saved_searches(self): """ Get saved search names/definitions as a dict """ r = requests.get( self.url + '/services/saved/searches?output_mode=json', headers=self.head, verify=False) out = r.json()['entry'] return {o['name']: o['content']['search'] for o in out} def start_query(self, q): """ Initiate a query as a job """ q = self._sanitize_query(q) # opportunity to pass extra args here, especially job timeout # http://docs.splunk.com/Documentation/Splunk/6.2.6/RESTREF/RESTsearch#POST_search.2Fjobs_method_detail r = requests.post(self.url + '/services/search/jobs?output_mode=json', verify=False, data={'search': q}, headers=self.head) return r.json()['sid'] def poll_query(self, sid): """ Check the status of a job """ path = '/services/search/jobs/{}?output_mode=json'.format(sid) r = requests.get(self.url + path, verify=False, headers=self.head) out = r.json()['entry'][0]['content'] # why not pass all job details? return out['isDone'], out.get('resultCount', 0) def wait_poll(self, sid): # instead of polling, job could be started in exec_mode="blocking" time0 = time.time() while True: done, count = self.poll_query(sid) if done: return done, count if time.time() - time0 > self.TIMEOUT: raise RuntimeError("Timeout waiting for Splunk " "to finish query") time.sleep(self.POLL_TIME) def get_query_result(self, sid, offset=0, count=0): """ Fetch query output (as CSV) """ # could potentially be streaming download path = ('/services/search/jobs/{}/results/?output_mode=csv' '&offset={}&count={}').format(sid, offset, count) r = requests.get(self.url + path, verify=False, headers=self.head) return r.content def get_dataframe(self, sid, offset=0, count=0, **kwargs): """ Read a chunk from completed query, return a pandas dataframe Parameters ---------- sid: str The job's ID offset: int Starting row count: int Number of rows to fetch kwargs: passed to pd.read_csv """ # Since we know the count, could pre-allocate df and set values in # chunks while streaming the download txt = self.get_query_result(sid, offset, count) return pd.read_csv(io.BytesIO(txt), **kwargs) def read_pandas(self, q, **kwargs): """ Start query, wait for completion and download data as a dataframe Parameters ---------- q: str Valid Splunk query kwargs: passed to pd.read_csv """ sid = self.start_query(q) self.wait_poll(sid) return self.get_dataframe(sid, **kwargs) def read_pandas_iter(self, q, chunksize, **kwargs): """ Start query, wait for completion and make an iterator of dataframes Parameters ---------- q: str Valid Splunk query chunksize: int Number of rows in each dataframe kwargs: passed to pd.read_csv """ sid = self.start_query(q) done, count = self.wait_poll(sid) for i in range(0, count, chunksize): yield self.get_dataframe(sid, offset=i, count=chunksize, **kwargs) def read_dask(self, q, chunksize, **kwargs): """ Start query, wait for completion, return lazy dask dataframe. This does download the first 20 rows in this thread, to infer dtypes. Parameters ---------- q: str Valid Splunk query chunksize: int Number of rows in each dataframe kwargs: passed to pd.read_csv """ from dask import delayed import dask.dataframe as dd sid = self.start_query(q) done, count = self.wait_poll(sid) meta = self.get_dataframe(sid, count=20)[:0] parts = [delayed(self.get_dataframe)(sid, offset=i, count=chunksize, **kwargs) for i in range(0, count, chunksize)] return dd.from_delayed(parts, meta=meta)