mrjob: setup logging on emr

  • Last Update :
  • Techknowledgy :

Here is an exmaple to get logging on stdout (python3)

from mrjob.job
import MRJob
from mrjob.job
import MRStep
from mrjob.util
import log_to_stream, log_to_null
import re
import sys
import logging

log = logging.getLogger(__name__)

WORD_RE = re.compile(r '[\w]+')

class MostUsedWords(MRJob):

   def set_up_logging(cls, quiet = False, verbose = False, stream = None):
   log_to_stream(name = 'mrjob', debug = verbose, stream = stream)
log_to_stream(name = '__main__', debug = verbose, stream = stream)

def steps(self):
   return [
      MRStep(mapper = self.mapper_get_words,
         combiner = self.combiner_get_words,
         reducer = self.reduce_get_words),
      MRStep(reducer = self.reducer_find_max)
   ]
pass
def mapper_get_words(self, _, line):
   for word in WORD_RE.findall(line):
   yield(word.lower(), 1)

def combiner_get_words(self, word, counts):
   yield(word, sum(counts))

def reduce_get_words(self, word, counts):
   log.info(word + "\t" + str(list(counts)))
yield None, (sum(counts), word)

def reducer_find_max(self, key, value):
   # value is pairs i.e., tuples
yield max(value)

if __name__ == '__main__':
   MostUsedWords.run()

Suggestion : 2

mrjob can fetch logs from persistent jobs even without SSH set up, but it has to pause 10 minutes to wait for EMR to transfer logs to S3, which defeats the purpose of rapid iteration.,In addition to looking at S3, mrjob can be configured to also use SSH to fetch error logs directly from the master and worker nodes. This can speed up debugging significantly (EMR only transfers logs to S3 every five minutes).,When troubleshooting a job, it can be convenient to use a persistent cluster to avoid having to wait for bootstrapping every run.,Now you can fix the bug and try again, without having to wait for a new cluster to bootstrap.

$ mrjob create - cluster
Using configs in /Users/davidmarin / .mrjob.conf
Using s3: //mrjob-35cdec11663cb1cb/tmp/ as our temp dir on S3
   Creating persistent cluster to run several jobs in ...
   Creating temp directory /
   var / folders / zv / jmtt5bxs6xl3kzt38470hcxm0000gn / T / no_script.davidmarin .20160324 .231018 .720057
Copying local files to s3: //mrjob-35cdec11663cb1cb/tmp/no_script.davidmarin.20160324.231018.720057/files/...
   j - 3 BYHP30KB81XE

Suggestion : 3

Be sure to keep the logs in your runners.emr.cleanup configuration.,Here is an exmaple to get logging on stdout (python3),Out of all options, the only one really works is using stderr with a direct write (sys.stderr.write) or using a logger with a StreamHandler to stderr.,The logs can later be retrieved after the job is finished (successfully or with an error) from:

Here is an exmaple to get logging on stdout (python3)

from mrjob.job
import MRJobfrom mrjob.job
import MRStepfrom mrjob.util
import log_to_stream, log_to_nullimport reimport sysimport logginglog = logging.getLogger(__name__) WORD_RE = re.compile(r '[\w]+') class MostUsedWords(MRJob): def set_up_logging(cls, quiet = False, verbose = False, stream = None): log_to_stream(name = 'mrjob', debug = verbose, stream = stream) log_to_stream(name = '__main__', debug = verbose, stream = stream) def steps(self): return [MRStep(mapper = self.mapper_get_words, combiner = self.combiner_get_words, reducer = self.reduce_get_words), MRStep(reducer = self.reducer_find_max)] pass def mapper_get_words(self, _, line): for word in WORD_RE.findall(line): yield(word.lower(), 1) def combiner_get_words(self, word, counts): yield(word, sum(counts)) def reduce_get_words(self, word, counts): log.info(word + "\t" + str(list(counts))) yield None, (sum(counts), word) def reducer_find_max(self, key, value): # value is pairs i.e., tuples yield max(value) if __name__ == '__main__': MostUsedWords.run()

Suggestion : 4

mrjob.job.MRJob.set_up_logging, mrjob.job.MRJob.set_up_logging

def main(cl_args = None):
   arg_parser = _make_arg_parser()
options = arg_parser.parse_args(cl_args)

MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)

runner_kwargs = {
   k: v
   for k,
   v in options.__dict__.items()
   if k not in ('quiet', 'verbose', 'step_id')
}

runner = EMRJobRunner( ** runner_kwargs)
emr_client = runner.make_emr_client()

# pick step
step = _get_step(emr_client, options.cluster_id, options.step_id)

if not step:
   raise SystemExit(1)

if step['Status']['State'] != 'FAILED':
   log.warning('step %s has state %s, not FAILED' %
      (step['Id'], step['Status']['State']))
file = open(output_file, 'w+')
for line in runner.stream_output():
   file.write(line)
file.close()

if __name__ == '__main__':
   # Make sure we got proper arguments.
if len(argv) < 3 or len(argv) > 5:
   exit("Usage: python driver.py " + \
      "[input file] [output file] [options]")

# Determine
if we 're running locally or on EMR.
LOCAL = not(len(argv) > 3 and argv[3] == '-emr')

# Output all logging information to STDOUT.
MRJob.set_up_logging(verbose = True, stream = stdout)

# Run all the MapReduce workers sequentially.
run_job(wordFrequency, argv[1], 'frequencies.txt')
run_job(wordCount, 'frequencies.txt', 'counts.txt')
run_job(corpusFrequency, 'counts.txt', 'corpus.txt')
run_job(calculateScore, 'corpus.txt', argv[2])
help = 'ID of cluster to run command on')
arg_parser.add_argument(dest = 'cmd_string',
   help = 'command to run, as a single string')

_add_basic_args(arg_parser)
_add_runner_args(
   arg_parser, {
      'ec2_key_pair_file',
      'ssh_bin'
   } | _filter_by_role(
      EMRJobRunner.OPT_NAMES, 'connect')
)

_alphabetize_actions(arg_parser)

options = arg_parser.parse_args(cl_args)

MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)

runner_kwargs = options.__dict__.copy()
for unused_arg in ('cluster_id', 'cmd_string', 'output_dir',
      'quiet', 'verbose'):
   del runner_kwargs[unused_arg]

cmd_args = shlex_split(options.cmd_string)

output_dir = os.path.abspath(options.output_dir or options.cluster_id)

with EMRJobRunner(
      cluster_id = options.cluster_id, ** runner_kwargs) as runner:
   _run_on_all_nodes(runner, output_dir, cmd_args)
def main(cl_args = None):
   # parser command - line args
arg_parser = _make_arg_parser()
options = arg_parser.parse_args(cl_args)

MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)

# create the persistent job
runner = EMRJobRunner( ** _runner_kwargs(options))
log.debug('Terminating cluster %s' % options.cluster_id)
runner.make_emr_client().terminate_job_flows(
   JobFlowIds = [options.cluster_id])
log.info('Terminated cluster %s' % options.cluster_id)
def main(cl_args = None):
   arg_parser = _make_arg_parser()
options = arg_parser.parse_args(cl_args)

MRJob.set_up_logging(quiet = options.quiet,
   verbose = options.verbose)

max_mins_idle = options.max_mins_idle

_maybe_terminate_clusters(
   dry_run = options.dry_run,
   max_mins_idle = max_mins_idle,
   unpooled_only = options.unpooled_only,
   now = _boto3_now(),
   pool_name = options.pool_name,
   pooled_only = options.pooled_only,
   max_mins_locked = options.max_mins_locked,
   quiet = options.quiet, **
   _runner_kwargs(options)
)
def main(args = None):
   option_parser = make_option_parser()
try:
options = parse_args(option_parser, args)
except OptionError:
   option_parser.error('This tool takes exactly one argument.')

MRJob.set_up_logging(quiet = options.quiet, verbose = options.verbose)

with EMRJobRunner( ** runner_kwargs(options)) as runner:
   perform_actions(options, runner)