Настройка SignalR и служебной шины в службе Azure Service Fabric.

Я переношу существующую рабочую роль облачной службы в Service Fabric как службу без сохранения состояния. Исходная облачная служба использует SignalR и служебную шину (в качестве объединительной панели SignalR) для отправки уведомлений любому прослушивающему клиенту. Существует класс Startup, который выполняет некоторые настройки:

class Startup
{
    public void Configuration(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[key]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<MyHub>();
    }
}

В методе OnStart() для WorkerRole я запускаю OWIN с помощью:

var endpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HttpEndpoint"];
var baseUri = $"{endpoint.Protocol}://{endpoint.IPEndpoint}";
var app = WebApp.Start<Startup>(new StartOptions(url: baseUri));

Как это (т. е. подключение к объединительной плате служебной шины SignalR) выполняется для службы без сохранения состояния в Service Fabric?


person ProfNimrod    schedule 22.09.2016    source источник


Ответы (2)


С помощью https://github.com/marcinbudny/SignalRSelfHostScaleOut (это пример масштабирования с использованием Redis) Думаю, я это слизал.

В ServiceManifest.xml я добавил следующую конечную точку:

<Endpoint Protocol="http" Name="ServiceEndpoint" Type="Input" Port="8322" />

Я также добавил класс Startup:

public static class Startup
{
    public static void ConfigureApp(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[value]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<InSysMainHub>();
    }
}

Также был добавлен класс OwinCommunicationListener:

public class OwinCommunicationListener : ICommunicationListener
{
    private readonly ServiceEventSource eventSource;
    private readonly Action<IAppBuilder> startup;
    private readonly ServiceContext serviceContext;
    private readonly string endpointName;
    private readonly string appRoot;

    private IDisposable webApp;
    private string publishAddress;
    private string listeningAddress;

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
        : this(startup, serviceContext, eventSource, endpointName, null)
    {
    }

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName, string appRoot)
    {
        if (startup == null)
        {
            throw new ArgumentNullException(nameof(startup));
        }

        if (serviceContext == null)
        {
            throw new ArgumentNullException(nameof(serviceContext));
        }

        if (endpointName == null)
        {
            throw new ArgumentNullException(nameof(endpointName));
        }

        if (eventSource == null)
        {
            throw new ArgumentNullException(nameof(eventSource));
        }

        this.startup = startup;
        this.serviceContext = serviceContext;
        this.endpointName = endpointName;
        this.eventSource = eventSource;
        this.appRoot = appRoot;
    }


    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
        var protocol = serviceEndpoint.Protocol;
        int port = serviceEndpoint.Port;

        if (this.serviceContext is StatefulServiceContext)
        {
            StatefulServiceContext statefulServiceContext = (StatefulServiceContext) serviceContext;

            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}{3}/{4}/{5}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/',
                statefulServiceContext.PartitionId,
                statefulServiceContext.ReplicaId,
                Guid.NewGuid());
        }
        else if (serviceContext is StatelessServiceContext)
        {
            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/');
        }
        else
        {
            throw new InvalidOperationException();
        }

        publishAddress = listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);

        try
        {
            eventSource.Message("Starting web server on " + listeningAddress);
            webApp = WebApp.Start(listeningAddress, appBuilder => startup.Invoke(appBuilder));
            eventSource.Message("Listening on " + this.publishAddress);
            return Task.FromResult(this.publishAddress);
        }
        catch (Exception ex)
        {
            eventSource.Message("Web server failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
            StopWebServer();
            throw;
        }
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.eventSource.Message("Closing web server on endpoint {0}", this.endpointName);

        this.StopWebServer();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        this.eventSource.Message("Aborting web server on endpoint {0}", this.endpointName);

        this.StopWebServer();
    }

    private void StopWebServer()
    {
        if (this.webApp != null)
        {
            try
            {
                this.webApp.Dispose();
            }
            catch (ObjectDisposedException)
            {
                // no-op
            }
        }
    }
}

И затем, наконец, я изменил метод CreateServiceInstanceListeners в своем коде службы без сохранения состояния на:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(serviceContext => new OwinCommunicationListener(Startup.ConfigureApp, serviceContext, ServiceEventSource.Current, "ServiceEndpoint"))
        };
    }
person ProfNimrod    schedule 24.09.2016

Создайте службу без сохранения состояния с прослушивателем Owin. Затем при запуске настройте signalR и объединительную плату (служебную шину или sql). В идеале вы должны столкнуться с проблемой согласования (рукопожатие между клиентом Signalr и сервером). На этом этапе попробуйте настроить запрос между источниками, пример кода для постоянного соединения будет выглядеть так, как показано ниже.

Также обратите внимание на строку appBuilder.UseAesDataProtectorProvider("Ваш ключ"), так как она важна. В результате вы не будете получать HTTP 400 для подключения большую часть времени. Это связано с тем, что SignalR будет делать как минимум 2 запроса при рукопожатии, и они обычно попадают на два разных компьютера.

Спасибо marcin budny за объяснение.

var config = new HttpConfiguration();

// Configure your origins as required.

var cors = new EnableCorsAttribute("*", "*", "*");

config.EnableCors(cors);

FormatterConfig.ConfigureFormatters(config.Formatters);

RouteConfig.RegisterRoutes(config.Routes);

appBuilder.UseWebApi(config);

GlobalHost.DependencyResolver.UseServiceBus("yourconnection string comes here", "signalrbackplaneserver");
appBuilder.UseAesDataProtectorProvider("some password");
appBuilder.Map("/echo", map =>
{
                map.UseCors(CorsOptions.AllowAll).RunSignalR<MyEndPoint>();
});
person Venkatesh    schedule 10.11.2016