Scribe Readers

Classes which read log data from scribe.

class clog.readers.CLogStreamIterator(stream_reader, line_num=-1, current_chunk=None, chunk_line_num=-1)

Iterator used by ClogStreamReader for iterating over lines of chunks of a stream.

class clog.readers.CLogStreamReader(stream_name, stream_dir, date, fail_on_missing=False)

Make a stream reader for a day of clog entries

clog entries are stored by stream name and date and broken into separate chunks which may or may not be compressed with gzip or bzip or be plaintext.

For instance, the entries for a stream called ‘foo’ on New Years Day 2009 will be laid out in the file system like

STREAM_DIR/foo/foo-2009-01-01_00000.gz
STREAM_DIR/foo/foo-2009-01-01_00001.gz

Example usage:

reader = CLogStreamReader('stream_name', '/path/to/logs', date.today())
for line in reader:
    print line
Parameters:
  • stream_dir – the stream directory like /storage/coraid5/scribe_logs
  • stream_name – the stream name like biz_views
  • date – the date of the logs
  • fail_on_missing – Fail if there are no log files for the specified stream and date
chunk_filenames()

Get an iterator for all the chunk filenames

class clog.readers.NetCLogStreamReader(bufsize=1024, host=None, port=None, automagic_recovery=False, localS3=False)

Read logs from a scribe server

Note

This reader will stream logs from the source, it is not recommended for large logs. Use a mrjob instead.

Example usage:

stream_reader = NetCLogStreamReader()
with stream_reader.read_date_range(
        'ranger',
        date(2010, 1, 1),
        date(2010,12,31)
) as reader:
    for line in reader:
        print line

Configuration:

This class can be configured either by passing a host and port to the constructor, or by using staticconf to with the following settings

scribe_net_reader.host
hostname of the scribe server used to stream scribe logs
scribe_net_reader.port
port of the scribe server used to stream scribe logs
Parameters:
  • bufsize – How many bytes to buffer internally
  • host – The host to connect to (defaults to scribe_net_reader.host)
  • port – The port to connect to (defaults to scribe_net_reader.port)
  • automagic_recovery – Whether to tail the stream, continuously retrying the connection (defaults to False)
list_streams()

Get a context manager to use for reading list names

read_date_range(stream_name, start_date, end_date)

Get a context manager to use for reading a stream for a date range

exception clog.readers.NoLogDataError
class clog.readers.StreamTailer(stream, host=None, port=None, bufsize=4096, automagic_recovery=True, add_newlines=True, raise_on_start=True, timeout=None, reconnect_callback=None, use_kafka=False, lines=None, protocol_opts=None)

Tail a Scribe stream from a tailing server

Example Usage:

tailer = StreamTailer('stream_name', host, port)
for line in tailer:
    print line

Configuration:

This class can be configured by passing a host and port to the constructor or by using staticconf with the following setting:

scribe_tail_services
(list of dicts {‘host’: host, ‘port’: port}) list of host and port addresses of scribe endpoints for tailing logs in real time.
Parameters:
  • stream (string) – the name of the string like ‘ranger’
  • host (string) – the host name
  • port – the port to connect to
  • bufsize – the number of bytes to buffer
  • automagic_recovery (bool) – continue to retry connection forever
  • add_newline – add newlines to the items yielded in the iter
  • raise_on_start (bool) – raise an error if you get a disconnect immediately after starting (otherwise, returns silently), Default True
  • timeout (int) – connection timeout
  • reconnect_callback (function) – callback called when reconnecting
  • protocol_opts (dict) – optional protocol parameters
list_streams()

Get a context manager to use for reading list names

exception clog.readers.StreamTailerSetupError(host, port, message)
clog.readers.construct_conn_msg(stream, lines=None, protocol_opts=None)

Return a connnection message

Parameters:
  • stream – stream name
  • lines – number of messages to consume
  • protocol_opts – optional arguments
clog.readers.get_s3_info(hostname, stream_name=None)

Returns (s3_host, s3_bucket(s), s3_prefix)

If no stream name is provided (i.e. None), both normal and tmp buckets are returned as a dict.