Here is an exmaple to get logging on stdout (python3)
from mrjob.job import MRJob from mrjob.job import MRStep from mrjob.util import log_to_stream, log_to_null import re import sys import logging log = logging.getLogger(__name__) WORD_RE = re.compile(r '[\w]+') class MostUsedWords(MRJob): def set_up_logging(cls, quiet = False, verbose = False, stream = None): log_to_stream(name = 'mrjob', debug = verbose, stream = stream) log_to_stream(name = '__main__', debug = verbose, stream = stream) def steps(self): return [ MRStep(mapper = self.mapper_get_words, combiner = self.combiner_get_words, reducer = self.reduce_get_words), MRStep(reducer = self.reducer_find_max) ] pass def mapper_get_words(self, _, line): for word in WORD_RE.findall(line): yield(word.lower(), 1) def combiner_get_words(self, word, counts): yield(word, sum(counts)) def reduce_get_words(self, word, counts): log.info(word + "\t" + str(list(counts))) yield None, (sum(counts), word) def reducer_find_max(self, key, value): # value is pairs i.e., tuples yield max(value) if __name__ == '__main__': MostUsedWords.run()
mrjob can fetch logs from persistent jobs even without SSH set up, but it has to pause 10 minutes to wait for EMR to transfer logs to S3, which defeats the purpose of rapid iteration.,In addition to looking at S3, mrjob can be configured to also use SSH to fetch error logs directly from the master and worker nodes. This can speed up debugging significantly (EMR only transfers logs to S3 every five minutes).,When troubleshooting a job, it can be convenient to use a persistent cluster to avoid having to wait for bootstrapping every run.,Now you can fix the bug and try again, without having to wait for a new cluster to bootstrap.
$ mrjob create - cluster
Using configs in /Users/davidmarin / .mrjob.conf
Using s3: //mrjob-35cdec11663cb1cb/tmp/ as our temp dir on S3
Creating persistent cluster to run several jobs in ...
Creating temp directory /
var / folders / zv / jmtt5bxs6xl3kzt38470hcxm0000gn / T / no_script.davidmarin .20160324 .231018 .720057
Copying local files to s3: //mrjob-35cdec11663cb1cb/tmp/no_script.davidmarin.20160324.231018.720057/files/...
j - 3 BYHP30KB81XE
Be sure to keep the logs in your runners.emr.cleanup configuration.,Here is an exmaple to get logging on stdout (python3),Out of all options, the only one really works is using stderr with a direct write (sys.stderr.write) or using a logger with a StreamHandler to stderr.,The logs can later be retrieved after the job is finished (successfully or with an error) from:
Here is an exmaple to get logging on stdout (python3)
from mrjob.job import MRJobfrom mrjob.job import MRStepfrom mrjob.util import log_to_stream, log_to_nullimport reimport sysimport logginglog = logging.getLogger(__name__) WORD_RE = re.compile(r '[\w]+') class MostUsedWords(MRJob): def set_up_logging(cls, quiet = False, verbose = False, stream = None): log_to_stream(name = 'mrjob', debug = verbose, stream = stream) log_to_stream(name = '__main__', debug = verbose, stream = stream) def steps(self): return [MRStep(mapper = self.mapper_get_words, combiner = self.combiner_get_words, reducer = self.reduce_get_words), MRStep(reducer = self.reducer_find_max)] pass def mapper_get_words(self, _, line): for word in WORD_RE.findall(line): yield(word.lower(), 1) def combiner_get_words(self, word, counts): yield(word, sum(counts)) def reduce_get_words(self, word, counts): log.info(word + "\t" + str(list(counts))) yield None, (sum(counts), word) def reducer_find_max(self, key, value): # value is pairs i.e., tuples yield max(value) if __name__ == '__main__': MostUsedWords.run()
mrjob.job.MRJob.set_up_logging, mrjob.job.MRJob.set_up_logging
def main(cl_args = None): arg_parser = _make_arg_parser() options = arg_parser.parse_args(cl_args) MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose) runner_kwargs = { k: v for k, v in options.__dict__.items() if k not in ('quiet', 'verbose', 'step_id') } runner = EMRJobRunner( ** runner_kwargs) emr_client = runner.make_emr_client() # pick step step = _get_step(emr_client, options.cluster_id, options.step_id) if not step: raise SystemExit(1) if step['Status']['State'] != 'FAILED': log.warning('step %s has state %s, not FAILED' % (step['Id'], step['Status']['State']))
file = open(output_file, 'w+') for line in runner.stream_output(): file.write(line) file.close() if __name__ == '__main__': # Make sure we got proper arguments. if len(argv) < 3 or len(argv) > 5: exit("Usage: python driver.py " + \ "[input file] [output file] [options]") # Determine if we 're running locally or on EMR. LOCAL = not(len(argv) > 3 and argv[3] == '-emr') # Output all logging information to STDOUT. MRJob.set_up_logging(verbose = True, stream = stdout) # Run all the MapReduce workers sequentially. run_job(wordFrequency, argv[1], 'frequencies.txt') run_job(wordCount, 'frequencies.txt', 'counts.txt') run_job(corpusFrequency, 'counts.txt', 'corpus.txt') run_job(calculateScore, 'corpus.txt', argv[2])
help = 'ID of cluster to run command on')
arg_parser.add_argument(dest = 'cmd_string',
help = 'command to run, as a single string')
_add_basic_args(arg_parser)
_add_runner_args(
arg_parser, {
'ec2_key_pair_file',
'ssh_bin'
} | _filter_by_role(
EMRJobRunner.OPT_NAMES, 'connect')
)
_alphabetize_actions(arg_parser)
options = arg_parser.parse_args(cl_args)
MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)
runner_kwargs = options.__dict__.copy()
for unused_arg in ('cluster_id', 'cmd_string', 'output_dir',
'quiet', 'verbose'):
del runner_kwargs[unused_arg]
cmd_args = shlex_split(options.cmd_string)
output_dir = os.path.abspath(options.output_dir or options.cluster_id)
with EMRJobRunner(
cluster_id = options.cluster_id, ** runner_kwargs) as runner:
_run_on_all_nodes(runner, output_dir, cmd_args)
def main(cl_args = None): # parser command - line args arg_parser = _make_arg_parser() options = arg_parser.parse_args(cl_args) MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose) # create the persistent job runner = EMRJobRunner( ** _runner_kwargs(options)) log.debug('Terminating cluster %s' % options.cluster_id) runner.make_emr_client().terminate_job_flows( JobFlowIds = [options.cluster_id]) log.info('Terminated cluster %s' % options.cluster_id)
def main(cl_args = None): arg_parser = _make_arg_parser() options = arg_parser.parse_args(cl_args) MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose) max_mins_idle = options.max_mins_idle _maybe_terminate_clusters( dry_run = options.dry_run, max_mins_idle = max_mins_idle, unpooled_only = options.unpooled_only, now = _boto3_now(), pool_name = options.pool_name, pooled_only = options.pooled_only, max_mins_locked = options.max_mins_locked, quiet = options.quiet, ** _runner_kwargs(options) )
def main(args = None):
option_parser = make_option_parser()
try:
options = parse_args(option_parser, args)
except OptionError:
option_parser.error('This tool takes exactly one argument.')
MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)
with EMRJobRunner( ** runner_kwargs(options)) as runner:
perform_actions(options, runner)