61

Pub/Sub Local Emulator

 4 years ago
source link: https://www.tuicool.com/articles/AjIz22i
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

aQrmmeV.png!web

This is the kind of pub they're talking about, right? :P

Pub/Sub is a nice tool provided by GCP. It is really handy and can help you with the messaging challenges your application might face. Actually, if you work with GCP, it is the managed messaging solution that you can use.

You might also like: Spring Boot and GCP Cloud Pub/Sub

As expected, working with the actual Pub/Sub solution comes with some quota, so for development, it is essential to use something that is not going to cost you.

In these cases, you can use the Pub/Sub emulator . To get started with the emulator, you need to install it.

gcloud components install pubsub-emulator

It is indeed convenient, however, to have a Docker image since it is way more portable. Unfortunately, there is no official image for that from Google Cloud. In any case, you can use one of the solutions available on Docker Hub.

Now let's run it:

gcloud beta emulators pubsub start --project=test-project

After that, your application can connect to the pub/sub emulator. The default port is 8085.

I will use a Java unit test as an example for this one:

package org.gkatzioura.pubsub;

import java.io.IOException;
import java.nio.charset.Charset;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class LocalPubSubTest {

    private static final String PROJECT = "test-project";
    private static final String SUBSCRIPTION_NAME = "SUBSCRIBER";
    private static final String TOPIC_NAME = "test-topic-id";

    private static final String hostPort = "127.0.0.1:8085";

    private ManagedChannel channel;
    private TransportChannelProvider channelProvider;
    private TopicAdminClient topicAdmin;

    private Publisher publisher;
    private SubscriberStub subscriberStub;
    private SubscriptionAdminClient subscriptionAdminClient;

    private ProjectTopicName topicName = ProjectTopicName.of(PROJECT, TOPIC_NAME);
    private ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT, SUBSCRIPTION_NAME);

    private Subscription subscription;

    @Before
    public void setUp() throws Exception {
        channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
        channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

        CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

        topicAdmin = createTopicAdmin(credentialsProvider);
        topicAdmin.createTopic(topicName);

        publisher = createPublisher(credentialsProvider);
        subscriberStub = createSubscriberStub(credentialsProvider);
        subscriptionAdminClient = createSubscriptionAdmin(credentialsProvider);
        subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
    }

    @After
    public void tearDown() throws Exception {
        topicAdmin.deleteTopic(topicName);
        subscriptionAdminClient.deleteSubscription(subscription.getName());
        channel.shutdownNow();
    }

    @Test
    public void testLocalPubSub() throws Exception {
        final String messageText = "text";
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                                                   .setData(ByteString.copyFrom(messageText, Charset.defaultCharset()))
                                                   .build();
        publisher.publish(pubsubMessage).get();

        PullRequest pullRequest = PullRequest.newBuilder()
                                             .setMaxMessages(1)
                                             .setReturnImmediately(true) // return immediately if messages are not available
                                             .setSubscription(subscription.getName())
                                             .build();

        PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
        String receiveMessageText = pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8();

        Assert.assertEquals(messageText, receiveMessageText);
    }

    private TopicAdminClient createTopicAdmin(CredentialsProvider credentialsProvider) throws IOException {
        return TopicAdminClient.create(
                TopicAdminSettings.newBuilder()
                                  .setTransportChannelProvider(channelProvider)
                                  .setCredentialsProvider(credentialsProvider)
                                  .build()
        );
    }

    private SubscriptionAdminClient createSubscriptionAdmin(CredentialsProvider credentialsProvider) throws IOException {
        SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
                                                                                       .setCredentialsProvider(credentialsProvider)
                                                                                       .setTransportChannelProvider(channelProvider)
                                                                                       .build();
        return SubscriptionAdminClient.create(subscriptionAdminSettings);
    }

    private Publisher createPublisher(CredentialsProvider credentialsProvider) throws IOException {
        return Publisher.newBuilder(topicName)
                        .setChannelProvider(channelProvider)
                        .setCredentialsProvider(credentialsProvider)
                        .build();
    }

    private SubscriberStub createSubscriberStub(CredentialsProvider credentialsProvider) throws IOException {
        SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
                                                                              .setTransportChannelProvider(channelProvider)
                                                                              .setCredentialsProvider(credentialsProvider)
                                                                              .build();
        return GrpcSubscriberStub.create(subscriberStubSettings);
    }

}

That's it. Now you can have some cost-efficient unit tests!

Further Reading

Event-Driven Pub-Sub Design

Bootiful GCP: Spring Cloud Stream With Google Cloud Pub/Sub


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK