Scribe Readers¶
Classes which read log data from scribe.
-
class
clog.readers.
CLogStreamIterator
(stream_reader, line_num=-1, current_chunk=None, chunk_line_num=-1)¶ Iterator used by
ClogStreamReader
for iterating over lines of chunks of a stream.
-
class
clog.readers.
CLogStreamReader
(stream_name, stream_dir, date, fail_on_missing=False)¶ Make a stream reader for a day of
clog
entriesclog
entries are stored by stream name and date and broken into separate chunks which may or may not be compressed with gzip or bzip or be plaintext.For instance, the entries for a stream called ‘foo’ on New Years Day 2009 will be laid out in the file system like
STREAM_DIR/foo/foo-2009-01-01_00000.gzSTREAM_DIR/foo/foo-2009-01-01_00001.gz…Example usage:
reader = CLogStreamReader('stream_name', '/path/to/logs', date.today()) for line in reader: print line
Parameters: - stream_dir – the stream directory like /storage/coraid5/scribe_logs
- stream_name – the stream name like biz_views
- date – the date of the logs
- fail_on_missing – Fail if there are no log files for the specified stream and date
-
chunk_filenames
()¶ Get an iterator for all the chunk filenames
-
class
clog.readers.
NetCLogStreamReader
(bufsize=1024, host=None, port=None, automagic_recovery=False, localS3=False)¶ Read logs from a scribe server
Note
This reader will stream logs from the source, it is not recommended for large logs. Use a mrjob instead.
Example usage:
stream_reader = NetCLogStreamReader() with stream_reader.read_date_range( 'ranger', date(2010, 1, 1), date(2010,12,31) ) as reader: for line in reader: print line
Configuration:
This class can be configured either by passing a host and port to the constructor, or by using
staticconf
to with the following settings- scribe_net_reader.host
- hostname of the scribe server used to stream scribe logs
- scribe_net_reader.port
- port of the scribe server used to stream scribe logs
Parameters: - bufsize – How many bytes to buffer internally
- host – The host to connect to (defaults to scribe_net_reader.host)
- port – The port to connect to (defaults to scribe_net_reader.port)
- automagic_recovery – Whether to tail the stream, continuously retrying the connection (defaults to False)
-
list_streams
()¶ Get a context manager to use for reading list names
-
read_date_range
(stream_name, start_date, end_date)¶ Get a context manager to use for reading a stream for a date range
-
exception
clog.readers.
NoLogDataError
¶
-
class
clog.readers.
StreamTailer
(stream, host=None, port=None, bufsize=4096, automagic_recovery=True, add_newlines=True, raise_on_start=True, timeout=None, reconnect_callback=None, use_kafka=False, lines=None, protocol_opts=None)¶ Tail a Scribe stream from a tailing server
Example Usage:
tailer = StreamTailer('stream_name', host, port) for line in tailer: print line
Configuration:
This class can be configured by passing a host and port to the constructor or by using
staticconf
with the following setting:- scribe_tail_services
- (list of dicts {‘host’: host, ‘port’: port}) list of host and port addresses of scribe endpoints for tailing logs in real time.
Parameters: - stream (string) – the name of the string like ‘ranger’
- host (string) – the host name
- port – the port to connect to
- bufsize – the number of bytes to buffer
- automagic_recovery (bool) – continue to retry connection forever
- add_newline – add newlines to the items yielded in the iter
- raise_on_start (bool) – raise an error if you get a disconnect immediately after starting (otherwise, returns silently), Default True
- timeout (int) – connection timeout
- reconnect_callback (function) – callback called when reconnecting
- protocol_opts (dict) – optional protocol parameters
-
list_streams
()¶ Get a context manager to use for reading list names
-
exception
clog.readers.
StreamTailerSetupError
(host, port, message)¶
-
clog.readers.
construct_conn_msg
(stream, lines=None, protocol_opts=None)¶ Return a connnection message
Parameters: - stream – stream name
- lines – number of messages to consume
- protocol_opts – optional arguments
-
clog.readers.
get_s3_info
(hostname, stream_name=None)¶ Returns (s3_host, s3_bucket(s), s3_prefix)
If no stream name is provided (i.e. None), both normal and tmp buckets are returned as a dict.