Python Code to Upload Into Google Datastorage

Seth Michael Larson

Python Information Streaming to Google Deject Storage with Resumable Uploads

A few days ago I spent a large clamper of my afternoon working on implementing memory-efficient data streaming to Google Cloud Storage (GCS) from a Python runtime.

There were several roadblocks along the fashion and I'd like to create the documentation that I wish I could observe while working on the effect.

This article uses Python three.6.4 but tin be adapted for other Python versions.

GCS support in google-cloud Module

The google-deject parcel is a giant collection of modules that can be used to interface with all of the Google Cloud Platform services so information technology'southward a dandy place to start.

python -m pip install -U google-cloud

Inside the google-cloud package is a module called which deals with all things GCS.

I downloaded and setup my GOOGLE_APPLICATION_CREDENTIALS locally and opened up a Python console to test out some of the functionality. I was able to quickly connect to GCS, create a Bucket, create a Blob, and upload binary data to the Blob.

                          from              google.deject              import              storage              customer              =              storage              .              Client              ()              bucket              =              customer              .              create_bucket              (              'test-bucket'              )              blob              =              client              .              blob              (              'exam-blob'              )              blob              .              upload_from_string              (              data              =              b              'ten'              *              1024              ,              content_type              =              'application/octet-stream'              ,              customer              =              client              )                      

Enter fullscreen mode Go out fullscreen mode

One thing I immediately noticed was that for edifice Armonaut my utilise-case would be progressively streaming output to GCS without saving the output to the file-arrangement of the compute example. There had to be a way to stream data rather than upload it all in one go.

Resumable Uploads to the Rescue!

The initial inquiry I did uncovered Resumable Uploads as an option for Google Cloud Storage. From their description it says that they have the following utilize-cases:

  • Yous are uploading a large file.
  • The chances of network failure are high.
  • Y'all don't know the size of the file when the upload starts.

Reasons #1 and #3 both applied to my use-case so I started investigating further.

I searched the google-cloud documentation for a mention of resumable uploads which yielded the Blob.create_resumable_upload_session() method. This method starts a Resumable Upload and returns a URL.

Resumable Media Package

The ready of interactions that must occur for a Resumable Upload to complete successfully were quite complex and I suspected there was already a package that handles this substitution. I establish the google-resumable-media package with a bit of Googling. ;-)

python -m pip install -U google-resumable-media

The central office of this package I was interested in is the google.resumable_media.requests.ResumableUpload class which takes an authorized ship then allows you to upload data in chunks and recover when errors are detected.

So far this was the code I was working with:

                          import              io              from              google.auth.send.requests              import              AuthorizedSession              from                  import              storage              from              google.resumable_media.requests              import              ResumableUpload              chunk_size              =              256              *              1024              # Minimum chunk-size supported by GCS                            stream              =              io              .              BytesIO              (              b              'x'              *              (              1024              *              1024              ))              # False data stream                            customer              =              storage              .              Client              ()              bucket              =              client              .              bucket              (              'test-bucket'              )              blob              =              client              .              blob              (              'examination-hulk'              )              # Create a Resumable Upload                            url              =              blob              .              create_resumable_upload_session              (              content_type              =              'awarding/octet-stream'              ,              client              =              customer              )              # Pass the URL off to the ResumableUpload object                            upload              =              ResumableUpload              (              upload_url              =              url              ,              chunk_size              =              chunk_size              )              transport              =              AuthorizedSession              (              credentials              =              client              .              _credentials              )              # Offset using the Resumable Upload                            upload              .              initiate              (              ship              =              transport              ,              content_type              =              'application/octet-stream'              ,              stream              =              stream              ,              metadata              =              {              'proper name'              :              blob              .              name              }              )                      

Enter fullscreen mode Exit fullscreen mode

Trouble was I was getting an fault on upload.initiate(). It was complaining that in that location was no Location header on the response. I investigated this issue and found that create_resumable_upload_session() was doing the work of upload.initiate()! I removed that step and instead used the API endpoint provided in the Resumable Upload documentation.

                          # Create a Resumable Upload                            url              =              (              f              'https://world wide'              f              '              {              bucket              .              proper noun              }              /o?uploadType=resumable'              )              upload              =              ResumableUpload              (              upload_url              =              url              ,              chunk_size              =              chunk_size              )              transport              =              AuthorizedSession              (              credentials              =              client              .              _credentials              )              # Start using the Resumable Upload                            upload              .              initiate              (              transport              =              send              ,              content_type              =              'application/octet-stream'              ,              stream              =              stream              ,              metadata              =              {              'name'              :              blob              .              name              }              )                      

Enter fullscreen mode Exit fullscreen mode

This snippet worked to start a Resumable Upload! Now to stream the information.

Streaming Data and stream_last=False

The ResumableUpload object has a method called transmit_next_chunk which tells the upload that the adjacent clamper may be uploaded. While reading the documentation almost this method I found stream_final which was a parameter of the ResumableUpload.initiate method.

I establish that if stream_final is set to Faux so the ResumableUpload will detect the "terminate" of the stream when a chunk is transmitted that is less than the chunk_size parameter set in its constructor. This meant that to stream an unknown amount of data that each chunk would accept to be >256KiB and would have to buffer output until that size was reached to be trasmitted.

Enjoying this post? Bank check out my Dev Web log for more than.

Putting it All Together

After getting a unproblematic example working I created a class that handles a unmarried stream of unknown length data being uploaded to a blob progressively and recovers from network errors if detected.

To accomplish this I implemented an object that both buffered data and had a file-like interface in order for it to be used by ResumableUpload equally a stream and be passed into other functions that require file-like objects for writing data.

Here is my final implementation:

                          from              google.auth.send.requests              import              AuthorizedSession              from              google.resumable_media              import              requests              ,              common              from                  import              storage              course              GCSObjectStreamUpload              (              object              ):              def              __init__              (              self              ,              client              :              storage              .              Customer              ,              bucket_name              :              str              ,              blob_name              :              str              ,              chunk_size              :              int              =              256              *              1024              ):              self              .              _client              =              client              self              .              _bucket              =              self              .              _client              .              saucepan              (              bucket_name              )              self              .              _blob              =              self              .              _bucket              .              hulk              (              blob_name              )              cocky              .              _buffer              =              b              ''              self              .              _buffer_size              =              0              self              .              _chunk_size              =              chunk_size              cocky              .              _read              =              0              self              .              _transport              =              AuthorizedSession              (              credentials              =              self              .              _client              .              _credentials              )              self              .              _request              =              None              # type: requests.ResumableUpload                            def              __enter__              (              cocky              ):              self              .              first              ()              return              cocky              def              __exit__              (              cocky              ,              exc_type              ,              *              _              ):              if              exc_type              is              None              :              self              .              end              ()              def              start              (              self              ):              url              =              (              f              'https://world wide'              f              '              {              self              .              _bucket              .              proper noun              }              /o?uploadType=resumable'              )              self              .              _request              =              requests              .              ResumableUpload              (              upload_url              =              url              ,              chunk_size              =              cocky              .              _chunk_size              )              self              .              _request              .              initiate              (              transport              =              self              .              _transport              ,              content_type              =              'application/octet-stream'              ,              stream              =              self              ,              stream_final              =              Simulated              ,              metadata              =              {              'name'              :              self              .              _blob              .              proper noun              },              )              def              stop              (              self              ):              cocky              .              _request              .              transmit_next_chunk              (              cocky              .              _transport              )              def              write              (              cocky              ,              data              :              bytes              )              ->              int              :              data_len              =              len              (              data              )              self              .              _buffer_size              +=              data_len              self              .              _buffer              +=              data              del              data              while              self              .              _buffer_size              >=              self              .              _chunk_size              :              try              :              cocky              .              _request              .              transmit_next_chunk              (              self              .              _transport              )              except              common              .              InvalidResponse              :              cocky              .              _request              .              recover              (              cocky              .              _transport              )              return              data_len              def              read              (              self              ,              chunk_size              :              int              )              ->              bytes              :              # I'one thousand non good with efficient no-copy buffering then if this is                            # wrong or there'due south a better manner to exercise this let me know! :-)                            to_read              =              min              (              chunk_size              ,              self              .              _buffer_size              )              memview              =              memoryview              (              self              .              _buffer              )              self              .              _buffer              =              memview              [              to_read              :].              tobytes              ()              self              .              _read              +=              to_read              self              .              _buffer_size              -=              to_read              render              memview              [:              to_read              ].              tobytes              ()              def              tell              (              self              )              ->              int              :              return              self              .              _read                      

Enter fullscreen fashion Exit fullscreen mode

The class tin exist used like so:

                          client              =              storage              .              Client              ()              with              GCSObjectStreamUpload              (              client              =              customer              ,              bucket              =              'exam-bucket'              ,              hulk              =              'test-blob'              )              as              due south              :              for              _              in              range              (              1024              ):              due south              .              write              (              b              'x'              *              1024              )                      

Enter fullscreen mode Exit fullscreen mode

Thanks for reading!


0 Response to "Python Code to Upload Into Google Datastorage"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel