Python Code to Upload Into Google Datastorage
Python Information Streaming to Google Deject Storage with Resumable Uploads
A few days ago I spent a large clamper of my afternoon working on implementing memory-efficient data streaming to Google Cloud Storage (GCS) from a Python runtime.
There were several roadblocks along the fashion and I'd like to create the documentation that I wish I could observe while working on the effect.
This article uses Python three.6.4 but tin be adapted for other Python versions.
GCS support in google-cloud
Module
The google-deject
parcel is a giant collection of modules that can be used to interface with all of the Google Cloud Platform services so information technology'southward a dandy place to start.
python -m pip install -U google-cloud
Inside the google-cloud
package is a module called google.cloud.storage
which deals with all things GCS.
I downloaded and setup my GOOGLE_APPLICATION_CREDENTIALS
locally and opened up a Python console to test out some of the functionality. I was able to quickly connect to GCS, create a Bucket, create a Blob, and upload binary data to the Blob.
from google.deject import storage customer = storage . Client () bucket = customer . create_bucket ( 'test-bucket' ) blob = client . blob ( 'exam-blob' ) blob . upload_from_string ( data = b 'ten' * 1024 , content_type = 'application/octet-stream' , customer = client )
One thing I immediately noticed was that for edifice Armonaut my utilise-case would be progressively streaming output to GCS without saving the output to the file-arrangement of the compute example. There had to be a way to stream data rather than upload it all in one go.
Resumable Uploads to the Rescue!
The initial inquiry I did uncovered Resumable Uploads as an option for Google Cloud Storage. From their description it says that they have the following utilize-cases:
- Yous are uploading a large file.
- The chances of network failure are high.
- Y'all don't know the size of the file when the upload starts.
Reasons #1 and #3 both applied to my use-case so I started investigating further.
I searched the google-cloud
documentation for a mention of resumable uploads which yielded the Blob.create_resumable_upload_session()
method. This method starts a Resumable Upload and returns a URL.
Resumable Media Package
The ready of interactions that must occur for a Resumable Upload to complete successfully were quite complex and I suspected there was already a package that handles this substitution. I establish the google-resumable-media
package with a bit of Googling. ;-)
python -m pip install -U google-resumable-media
The central office of this package I was interested in is the google.resumable_media.requests.ResumableUpload
class which takes an authorized ship then allows you to upload data in chunks and recover when errors are detected.
So far this was the code I was working with:
import io from google.auth.send.requests import AuthorizedSession from google.cloud import storage from google.resumable_media.requests import ResumableUpload chunk_size = 256 * 1024 # Minimum chunk-size supported by GCS stream = io . BytesIO ( b 'x' * ( 1024 * 1024 )) # False data stream customer = storage . Client () bucket = client . bucket ( 'test-bucket' ) blob = client . blob ( 'examination-hulk' ) # Create a Resumable Upload url = blob . create_resumable_upload_session ( content_type = 'awarding/octet-stream' , client = customer ) # Pass the URL off to the ResumableUpload object upload = ResumableUpload ( upload_url = url , chunk_size = chunk_size ) transport = AuthorizedSession ( credentials = client . _credentials ) # Offset using the Resumable Upload upload . initiate ( ship = transport , content_type = 'application/octet-stream' , stream = stream , metadata = { 'proper name' : blob . name } )
Trouble was I was getting an fault on upload.initiate()
. It was complaining that in that location was no Location
header on the response. I investigated this issue and found that create_resumable_upload_session()
was doing the work of upload.initiate()
! I removed that step and instead used the API endpoint provided in the Resumable Upload documentation.
# Create a Resumable Upload url = ( f 'https://world wide web.googleapis.com/upload/storage/v1/b/' f ' { bucket . proper noun } /o?uploadType=resumable' ) upload = ResumableUpload ( upload_url = url , chunk_size = chunk_size ) transport = AuthorizedSession ( credentials = client . _credentials ) # Start using the Resumable Upload upload . initiate ( transport = send , content_type = 'application/octet-stream' , stream = stream , metadata = { 'name' : blob . name } )
This snippet worked to start a Resumable Upload! Now to stream the information.
Streaming Data and stream_last=False
The ResumableUpload
object has a method called transmit_next_chunk
which tells the upload that the adjacent clamper may be uploaded. While reading the documentation almost this method I found stream_final
which was a parameter of the ResumableUpload.initiate
method.
I establish that if stream_final
is set to Faux
so the ResumableUpload
will detect the "terminate" of the stream when a chunk is transmitted that is less than the chunk_size
parameter set in its constructor. This meant that to stream an unknown amount of data that each chunk would accept to be >256KiB and would have to buffer output until that size was reached to be trasmitted.
Enjoying this post? Bank check out my Dev Web log for more than.
Putting it All Together
After getting a unproblematic example working I created a class that handles a unmarried stream of unknown length data being uploaded to a blob progressively and recovers from network errors if detected.
To accomplish this I implemented an object that both buffered data and had a file-like interface in order for it to be used by ResumableUpload
equally a stream
and be passed into other functions that require file-like objects for writing data.
Here is my final implementation:
from google.auth.send.requests import AuthorizedSession from google.resumable_media import requests , common from google.cloud import storage course GCSObjectStreamUpload ( object ): def __init__ ( self , client : storage . Customer , bucket_name : str , blob_name : str , chunk_size : int = 256 * 1024 ): self . _client = client self . _bucket = self . _client . saucepan ( bucket_name ) self . _blob = self . _bucket . hulk ( blob_name ) cocky . _buffer = b '' self . _buffer_size = 0 self . _chunk_size = chunk_size cocky . _read = 0 self . _transport = AuthorizedSession ( credentials = self . _client . _credentials ) self . _request = None # type: requests.ResumableUpload def __enter__ ( cocky ): self . first () return cocky def __exit__ ( cocky , exc_type , * _ ): if exc_type is None : self . end () def start ( self ): url = ( f 'https://world wide web.googleapis.com/upload/storage/v1/b/' f ' { self . _bucket . proper noun } /o?uploadType=resumable' ) self . _request = requests . ResumableUpload ( upload_url = url , chunk_size = cocky . _chunk_size ) self . _request . initiate ( transport = self . _transport , content_type = 'application/octet-stream' , stream = self , stream_final = Simulated , metadata = { 'name' : self . _blob . proper noun }, ) def stop ( self ): cocky . _request . transmit_next_chunk ( cocky . _transport ) def write ( cocky , data : bytes ) -> int : data_len = len ( data ) self . _buffer_size += data_len self . _buffer += data del data while self . _buffer_size >= self . _chunk_size : try : cocky . _request . transmit_next_chunk ( self . _transport ) except common . InvalidResponse : cocky . _request . recover ( cocky . _transport ) return data_len def read ( self , chunk_size : int ) -> bytes : # I'one thousand non good with efficient no-copy buffering then if this is # wrong or there'due south a better manner to exercise this let me know! :-) to_read = min ( chunk_size , self . _buffer_size ) memview = memoryview ( self . _buffer ) self . _buffer = memview [ to_read :]. tobytes () self . _read += to_read self . _buffer_size -= to_read render memview [: to_read ]. tobytes () def tell ( self ) -> int : return self . _read
The class tin exist used like so:
client = storage . Client () with GCSObjectStreamUpload ( client = customer , bucket = 'exam-bucket' , hulk = 'test-blob' ) as due south : for _ in range ( 1024 ): due south . write ( b 'x' * 1024 )
Thanks for reading!
Source: https://dev.to/sethmlarson/python-data-streaming-to-google-cloud-storage-with-resumable-uploads-458h
0 Response to "Python Code to Upload Into Google Datastorage"
Post a Comment