Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Python

Google Cloud Pub/Sub - Setting Up Your Application

0.00/5 (No votes)
3 Dec 2014CC (Attr 3U)3 min read 10.5K  
Before you can make requests to the Google Cloud Pub/Sub API, your application must set up authorization, using the OAuth 2.0 protocol. If you are using the Google Cloud Pub/Sub client library, you must also create an instance of the Pubsub class.

This article is for our sponsors at CodeProject. These articles are intended to provide you with information on products and services that we consider useful and of value to developers

Before you can make requests to the Google Cloud Pub/Sub API, your application must set up authorization, using the OAuth 2.0 protocol. If you are using the Google Cloud Pub/Sub client library, you must also create an instance of the Pubsub class.

You should also implement a system to handle retry attempts in case of RPC request failures.

Authorization

The Google APIs client libraries handle much of the authorization process for you. The details of the authorization process, or "flow," for OAuth 2.0 vary somewhat depending on what kind of application you're using with Google Cloud Pub/Sub. (For detailed information about flows for various types of applications, see Google's OAuth 2.0 documentation.)

Since most Pub/Sub messaging operations occur offline, with no human intervention, the most typical flow for a Google Cloud Pub/Sub application is server-to-server, using a service account. Your application signs the authorization request with the secret key to obtain a web access token that accompanies every call to the API. Google App Engine and Google Compute Engine use service accounts under the hood, and simplify the authorization process.

  1. Authorization scope
  2. Authorizing requests from a Google App Engine application
  3. Authorizing requests from a Google Compute Engine application
  4. Authorizing requests from a local or third-party host

Authorization scope

When you request accessing using OAuth 2.0, your application specifies the authorization scope information for Google Cloud Pub/Sub. You can use either of the following:

Scope Meaning
https://www.googleapis.com/auth/pubsub Full access.
https://www.googleapis.com/auth/cloud-platform Full access.

Tip: If you're using the Google Cloud Pub/Sub client libraries, you can get the scope programmatically through an API.

Authorizing requests from a Google App Engine application

The following samples demonstrate how to set up your client and use the App Engine App Identity API to authorize calls to the Google Cloud Pub/Sub API.

Java

This sample uses the Google APIs Client Library for Java.

The custom RetryHttpInitializerWrapper class is described in the Retry Handling section.

import com.google.api.client.extensions.appengine.http.UrlFetchTransport;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.extensions.appengine.auth.oauth2
    .AppIdentityCredential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

import java.io.IOException;

/**
 * Create a Pubsub client on App Engine.
 */
public class AppEngineConfiguration {

    private static final HttpTransport TRANSPORT =
        UrlFetchTransport.getDefaultInstance();
    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient() throws IOException {
        GoogleCredential credential = 
            new AppIdentityCredential.AppEngineCredentialWrapper(
                TRANSPORT, JSON_FACTORY).createScoped(PubsubScopes.all());
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(TRANSPORT, JSON_FACTORY, initializer)
            .build();
    }
}

Python

This sample uses the Google APIs Client Library for Python.

import httplib2

import oauth2client.appengine as gae_oauth2client
from apiclient import discovery
from google.appengine.api import memcache

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    credentials = gae_oauth2client.AppAssertionCredentials(
        scope=PUBSUB_SCOPES)
    http = httplib2.Http(memcache)
    credentials.authorize(http)

    return discovery.build('pubsub', 'v1beta1', http=http)

Authorizing requests from a Google Compute Engine application

If your application runs on a Google Compute Engine instance, it authenticates using an access token obtained from a metadata server.

Note: Be sure you have configured your instance to use service accounts and have added the Pub/Sub scope listed above. For detailed procedures, see Preparing Your Instance to Use Service Accounts in the Google Compute Engine documentation.

The following samples demonstrate how to set up your client and use the Compute Credential API to authorize calls to the Google Cloud Pub/Sub API.

Java

This sample uses the Google APIs Client Library for Java.

The custom RetryHttpInitializerWrapper class is described in the Retry Handling section.

import com.google.api.client.googleapis.compute.ComputeCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;

import com.google.api.services.pubsub.Pubsub;

import java.io.IOException;
import java.security.GeneralSecurityException;

/**
 * Create a Pubsub client on Compute Engine.
 */
public class ComputeEngineConfiguration {

    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient()
            throws GeneralSecurityException, IOException {
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        ComputeCredential credential =
            new ComputeCredential.Builder(transport, JSON_FACTORY)
                .build();
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(transport, JSON_FACTORY, initializer)
            .build();
    }
}

Python

This sample uses the Google APIs Client Library for Python.

import httplib2

import oauth2client.gce as gce_oauth2client
from apiclient import discovery

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    credentials = gce_oauth2client.AppAssertionCredentials(
        scope=PUBSUB_SCOPES)
    http = httplib2.Http()
    credentials.authorize(http)
    return discovery.build('pubsub', 'v1beta1', http=http)

Authorizing requests from a local or third-party host (using a service account)

If you're running a local client or in a non-Google Cloud environment, you'll need to provide the credentials you obtained when setting up a service account. The following sample demonstrates how to set up your client and use your service account email address and private key to authorize calls to the Google Cloud Pub/Sub API.

Java

This sample uses the Google APIs Client Library for Java.

The custom RetryHttpInitializerWrapper class is described in the Retry Handling section.

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;

/**
 * Create a Pubsub client with the service account.
 */
public class ServiceAccountConfiguration {

    private static final JsonFactory JSON_FACTORY =
        JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient()
            throws IOException, GeneralSecurityException {
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        GoogleCredential credential = new GoogleCredential.Builder()
            .setTransport(transport)
            .setJsonFactory(JSON_FACTORY)
            .setServiceAccountScopes(PubsubScopes.all())

            // Obtain this from the "APIs & auth" -> "Credentials"
            // section in the Google Developers Console:
            // https://console.developers.google.com/
            // (and put the e-mail address into your system property obviously)
            .setServiceAccountId(System.getProperty("SERVICE_ACCOUNT_EMAIL"))

            // Download this file from "APIs & auth" -> "Credentials"
            // section in the Google Developers Console:
            // https://console.developers.google.com/
            .setServiceAccountPrivateKeyFromP12File(
                new File(System.getProperty("PRIVATE_KEY_FILE_PATH")))
            .build();
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures.  We provide a simple reference
        // implementation in the "Retry Handling" section.
        HttpRequestInitializer initializer =
            new RetryHttpInitializerWrapper(credential);
        return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
    }
}

Python

This sample uses the Google APIs Client Library for Python.

import httplib2

from apiclient import discovery
from oauth2client import client as oauth2client

PUBSUB_SCOPES = ['https://www.googleapis.com/auth/pubsub']

def create_pubsub_client():
    private_key = None
    # Obtain this file from the "APIs & auth" -> "Credentials"
    # section in the Google Developers Console:
    # https://console.developers.google.com/
    with open('MY_PRIVATE_KEY_FILE.p12', 'r') as f:
        private_key = f.read()
    credentials = oauth2client.SignedJwtAssertionCredentials(
        # Obtain this from the "APIs & auth" -> "Credentials"
        # section in the Google Developers Console:
        # https://console.developers.google.com/
        'MY_SERVICE_ACCOUNT_EMAIL',
        private_key,
        PUBSUB_SCOPES)
    http = httplib2.Http()
    credentials.authorize(http)

    return discovery.build('pubsub', 'v1beta1', http=http)

Retry Handling

You should implement code to handle retry attempts with an increasing backoff in the case of RPC failures.

Java

Here is a sample Java class to handle retry attempts for you.

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.common.base.Preconditions;

import java.io.IOException;
import java.util.logging.Logger;

/**
 * RetryHttpInitializerWrapper will automatically retry upon RPC
 * failures, preserving the auto-refresh behavior of the Google
 * Credentials.
 */
public class RetryHttpInitializerWrapper implements HttpRequestInitializer {

    private static final Logger LOG =
        Logger.getLogger(RetryHttpInitializerWrapper.class.getName());

    // Intercepts the request for filling in the "Authorization"
    // header field, as well as recovering from certain unsuccessful
    // error codes wherein the Credential must refresh its token for a
    // retry.
    private final Credential wrappedCredential;

    // A sleeper; you can replace it with a mock in your test.
    private final Sleeper sleeper;

    public RetryHttpInitializerWrapper(Credential wrappedCredential) {
        this(wrappedCredential, Sleeper.DEFAULT);
    }

    // Use only for testing.
    RetryHttpInitializerWrapper(
            Credential wrappedCredential, Sleeper sleeper) {
        this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
        this.sleeper = sleeper;
    }

    @Override
    public void initialize(HttpRequest request) {
        final HttpUnsuccessfulResponseHandler backoffHandler =
            new HttpBackOffUnsuccessfulResponseHandler(
                new ExponentialBackOff())
                    .setSleeper(sleeper);
        request.setInterceptor(wrappedCredential);
        request.setUnsuccessfulResponseHandler(
                new HttpUnsuccessfulResponseHandler() {
                    @Override
                    public boolean handleResponse(
                            HttpRequest request,
                            HttpResponse response,
                            boolean supportsRetry) throws IOException {
                        if (wrappedCredential.handleResponse(
                                request, response, supportsRetry)) {
                            // If credential decides it can handle it,
                            // the return code or message indicated
                            // something specific to authentication,
                            // and no backoff is desired.
                            return true;
                        } else if (backoffHandler.handleResponse(
                                request, response, supportsRetry)) {
                            // Otherwise, we defer to the judgement of
                            // our internal backoff handler.
                            LOG.info("Retrying "
                                    + request.getUrl().toString());
                            return true;
                        } else {
                            return false;
                        }
                    }
                });
        request.setIOExceptionHandler(
            new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
                .setSleeper(sleeper));
    }
}

Python

You can pass num_retries=n argument to the API's execute() calls in order to retry with exponential backoff upon intermittent failures.

resp = client.subscriptions().pullBatch(body=body).execute(num_retries=3)

Except as otherwise noted, the code samples of this page is licensed under the Apache 2.0 License.

License

This article, along with any associated source code and files, is licensed under The Creative Commons Attribution 3.0 Unported License