Alternatively, for convenience, you can use the standard Python library's gzip module, e.g for the reading phase something like:
compressed_flo = cloudstorage.open('objname', 'r')
uncompressed_flo = gzip.GzipFile(fileobj = compressed_flo, mode = 'rb')
csvReader = csv.reader(uncompressed_flo)
Edit: This is example code for how to decompress and analzye a zlib or gzip stream chunk-wise, purely based on zlib
:
import zlib
from collections
import Counter
def stream(filename):
with open(filename, "rb") as f:
while True:
chunk = f.read(1024)
if not chunk:
break
yield chunk
def decompress(stream):
# Generate decompression object.Auto - detect and ignore
# gzip wrapper,
if present.
z = zlib.decompressobj(32 + 15)
for chunk in stream:
r = z.decompress(chunk)
if r:
yield r
c = Counter()
s = stream("data.gz")
for chunk in decompress(s):
for byte in chunk:
c[byte] += 1
print c
Utils for streaming large files (S3, HDFS, GCS, Azure Blob Storage, gzip, bz2...),smart_open is a Python 3 library for efficient streaming of very large files from/to storages such as S3, GCS, Azure Blob Storage, HDFS, WebHDFS, HTTP, HTTPS, SFTP, or local filesystem. It supports transparent, on-the-fly (de-)compression for a variety of different formats.,smart_open supports a wide range of storage solutions, including AWS S3, Google Cloud and Azure. Each individual solution has its own dependencies. By default, smart_open does not install any dependencies, in order to keep the installation size small. You can install these dependencies explicitly using:, Tags file streaming, s3, hdfs, gcs, azure blob storage
smart_open is well-tested, well-documented, and has a simple Pythonic API:
>>> from smart_open
import open
>>>
>>>
# stream lines from an S3 object >>>
for line in open('s3://commoncrawl/robots.txt'):
...print(repr(line))
...
break 'User-Agent: *\n'
>>>
# stream from / to compressed files, with transparent(de) compression:
>>>
for line in open('smart_open/tests/test_data/1984.txt.gz', encoding = 'utf-8'):
...print(repr(line))
'It was a bright cold day in April, and the clocks were striking thirteen.\n'
'Winston Smith, his chin nuzzled into his breast in an effort to escape the vile\n'
'wind, slipped quickly through the glass doors of Victory Mansions, though not\n'
'quickly enough to prevent a swirl of gritty dust from entering along with him.\n'
>>>
# can use context managers too:
>>>
with open('smart_open/tests/test_data/1984.txt.gz') as fin:
...with open('smart_open/tests/test_data/1984.txt.bz2', 'w') as fout:
...
for line in fin:
...fout.write(line)
74
80
78
79
>>>
# can use any IOBase operations, like seek >>>
with open('s3://commoncrawl/robots.txt', 'rb') as fin:
...
for line in fin:
...print(repr(line.decode('utf-8')))
...
break
...offset = fin.seek(0) # seek to the beginning
...print(fin.read(4))
'User-Agent: *\n'
b 'User'
>>>
# stream from HTTP >>>
for line in open('http://example.com/index.html'):
...print(repr(line))
...
break '<!doctype html>\n'
Other examples of URLs that smart_open accepts:
s3: //my_bucket/my_key
s3: //my_key:my_secret@my_bucket/my_key
s3: //my_key:my_secret@my_server:my_port@my_bucket/my_key
gs: //my_bucket/my_blob
azure: //my_bucket/my_blob
hdfs: ///path/file
hdfs: //path/file
webhdfs: //host:port/path/file
. / local / path / file
~/local/path / file
local / path / file
. / local / path / file.gz
file: ///home/user/file
file: ///home/user/file.bz2
[ssh | scp | sftp]: //username@host//path/file
[ssh | scp | sftp]: //username@host/path/file
[ssh | scp | sftp]: //username:password@host/path/file
smart_open supports a wide range of storage solutions, including AWS S3, Google Cloud and Azure. Each individual solution has its own dependencies. By default, smart_open does not install any dependencies, in order to keep the installation size small. You can install these dependencies explicitly using:
pip install smart_open[azure] # Install Azure deps
pip install smart_open[gcs] # Install GCS deps
pip install smart_open[s3] # Install S3 deps
Or, if you don’t mind installing a large number of third party libraries, you can install all dependencies using:
pip install smart_open[all]
For detailed API info, see the online help:
help('smart_open')
For the sake of simplicity, the examples below assume you have all the dependencies installed, i.e. you have done:
pip install smart_open[all]
pip install smart_open[all]
>>> import os, boto3 >>> >>> # stream content * into * S3(write mode) using a custom session >>> session = boto3.Session( ...aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'], ...aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'], ...) >>> url = 's3://smart-open-py37-benchmark-results/test.txt' >>> with open(url, 'wb', transport_params = { 'client': session.client('s3') }) as fout: ...bytes_written = fout.write(b 'hello world!') ...print(bytes_written) 12
>>> import os, boto3 >>> >>> # stream content *into* S3 (write mode) using a custom session >>> session = boto3.Session( ... aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], ... aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], ... ) >>> url = 's3://smart-open-py37-benchmark-results/test.txt' >>> with open(url, 'wb', transport_params={'client': session.client('s3')}) as fout: ... bytes_written = fout.write(b'hello world!') ... print(bytes_written) 12
# stream from HDFS for line in open('hdfs://user/hadoop/my_file.txt', encoding = 'utf8'): print(line) # stream from WebHDFS for line in open('webhdfs://host:port/user/hadoop/my_file.txt'): print(line) # stream content * into * HDFS(write mode): with open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout: fout.write(b 'hello world') # stream content * into * WebHDFS(write mode): with open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout: fout.write(b 'hello world') # stream from a completely custom s3 server, like s3proxy: for line in open('s3u://user:secret@host:port@mybucket/mykey.txt'): print(line) # Stream to Digital Ocean Spaces bucket providing credentials from boto3 profile session = boto3.Session(profile_name = 'digitalocean') client = session.client('s3', endpoint_url = 'https://ams3.digitaloceanspaces.com') transport_params = { 'client': client } with open('s3://bucket/key.txt', 'wb', transport_params = transport_params) as fout: fout.write(b 'here we stand') # stream from GCS for line in open('gs://my_bucket/my_file.txt'): print(line) # stream content * into * GCS(write mode): with open('gs://my_bucket/my_file.txt', 'wb') as fout: fout.write(b 'hello world') # stream from Azure Blob Storage connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } for line in open('azure://mycontainer/myfile.txt', transport_params = transport_params): print(line) # stream content * into * Azure Blob Storage(write mode): connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } with open('azure://mycontainer/my_file.txt', 'wb', transport_params = transport_params) as fout: fout.write(b 'hello world')
By default, smart_open determines the compression algorithm to use based on the file extension.
>>> from smart_open
import open, register_compressor
>>>
with open('smart_open/tests/test_data/1984.txt.gz') as fin:
...print(fin.read(32))
It was a bright cold day in Apri
You can override this behavior to either disable compression, or explicitly specify the algorithm to use. To disable compression:
>>> from smart_open
import open, register_compressor
>>>
with open('smart_open/tests/test_data/1984.txt.gz', 'rb', compression = 'disable') as fin:
...print(fin.read(32))
b '\x1f\x8b\x08\x08\x85F\x94\\\x00\x031984.txt\x005\x8f=r\xc3@\x08\x85{\x9d\xe2\x1d@'
To specify the algorithm explicitly (e.g. for non-standard file extensions):
>>> from smart_open
import open, register_compressor
>>>
with open('smart_open/tests/test_data/1984.txt.gzip', compression = '.gz') as fin:
...print(fin.read(32))
It was a bright cold day in Apri
Each option involves setting up its own set of parameters. For example, for accessing S3, you often need to set up authentication, like API keys or a profile name. smart_open’s open function accepts a keyword argument transport_params which accepts additional parameters for the transport layer. Here are some examples of using this parameter:
>>>
import boto3
>>>
fin = open('s3://commoncrawl/robots.txt', transport_params = dict(client = boto3.client('s3'))) >>>
fin = open('s3://commoncrawl/robots.txt', transport_params = dict(buffer_size = 1024))
For the full list of keyword arguments supported by each transport option, see the documentation:
help('smart_open.open')
Last updated on 12th of October, 2020
resource "google_logging_organization_sink" "audit-logging-sink" { name = "org-audit-logging" org_id = var.org_id # apply to the entire org include_children = true # Can export to pubsub, cloud storage, or bigquery destination = "storage.googleapis.com/${google_storage_bucket.bucket-audit-logging.name}" # Log all AuditLogs filter = "protoPayload.\"@type\"=type.googleapis.com/google.cloud.audit.AuditLog" }
func CompressGCSLogs(ctx context.Context, e GCSEvent) error {
client,
err: = storage.NewClient(ctx)
logging.Logger.Debugf("Compressing file [%s] in [%s] bucket.", e.Name, e.Bucket)
if err != nil {
logging.Logger.Error(err)
return err
}
// 1 Read object from a bucket
rc,
err: = client.Bucket(e.Bucket).Object(e.Name).NewReader(ctx)
if err != nil {
logging.Logger.Error(err)
return err
}
// 2 Destination bucket and file name (with added .gzip extention)
wc: = client.Bucket("all-logs-export-compressed").Object(e.Name + ".gzip").NewWriter(ctx)
//3 Create a gzip writer
gzipWriter: = gzip.NewWriter(wc)
if err != nil]
logging.Logger.Error(err)
return err
}
// 4 Write compressed version of the file
if _, err: = io.Copy(gzipWriter, rc);
err != nil {
logging.Logger.Error(err)
return err
}
if err: = wc.Close();
err != nil {
logging.Logger.Error(err)
return err
}
rc.Close()
return nil
}
gcloud functions deploy CompressGCSLogs \
--runtime go113 \
--trigger-resource all-logs-export \
--trigger-event google.storage.object.finalize \
--project <your-project-name> \
--region <your-region> \
--timeout=540s
gsutil du - hc "gs://abc-audit-logging/cloudaudit.googleapis.com/*/2020/10/02/" # 61.68 GiB total before compression gsutil du - hc "gs://abc-audit-logging-compressed/cloudaudit.googleapis.com/*/2020/10/02/" # 4.66 GiB total after compression