Skip to content

Commit 81eca35

Browse files
nodeceRobertIndie
authored andcommitted
fix: fix producer connection (#1243)
* fix: fix producer connection * Fix test * Fix nil pointer * Fix GetConnection err * Fix cnx (cherry picked from commit 29f2779)
1 parent 50dce7e commit 81eca35

File tree

3 files changed

+52
-32
lines changed

3 files changed

+52
-32
lines changed

pulsar/internal/rpc_client.go

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,18 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ
150150

151151
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
152152
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
153-
c.metrics.RPCRequestCount.Inc()
154153
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
155154
if err != nil {
156155
return nil, err
157156
}
158157

158+
return c.RequestOnCnx(cnx, requestID, cmdType, message)
159+
}
160+
161+
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
162+
message proto.Message) (*RPCResult, error) {
163+
c.metrics.RPCRequestCount.Inc()
164+
159165
ch := make(chan result, 1)
160166

161167
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
@@ -171,7 +177,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
171177
case res := <-ch:
172178
// Ignoring producer not ready response.
173179
// Continue to wait for the producer to create successfully
174-
if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
180+
if res.error == nil && res.Response != nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
175181
if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
176182
timeoutCh = nil
177183
break
@@ -184,28 +190,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
184190
}
185191
}
186192

187-
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
188-
message proto.Message) (*RPCResult, error) {
189-
c.metrics.RPCRequestCount.Inc()
190-
191-
ch := make(chan result, 1)
192-
193-
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
194-
ch <- result{&RPCResult{
195-
Cnx: cnx,
196-
Response: response,
197-
}, err}
198-
close(ch)
199-
})
200-
201-
select {
202-
case res := <-ch:
203-
return res.RPCResult, res.error
204-
case <-time.After(c.requestTimeout):
205-
return nil, ErrRequestTimeOut
206-
}
207-
}
208-
209193
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
210194
c.metrics.RPCRequestCount.Inc()
211195
return cnx.SendRequestNoWait(baseCommand(cmdType, message))

pulsar/producer_partition.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -284,20 +284,24 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
284284

285285
cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr)
286286
// registering the producer first in case broker sends commands in the middle
287-
if err == nil {
288-
p._setConn(cnx)
289-
err = p._getConn().RegisterListener(p.producerID, p)
290-
if err != nil {
291-
p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID)
292-
}
287+
if err != nil {
288+
p.log.Error("Failed to get connection")
289+
return err
290+
}
291+
292+
p._setConn(cnx)
293+
err = p._getConn().RegisterListener(p.producerID, p)
294+
if err != nil {
295+
p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID)
293296
}
294297

295-
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
298+
res, err := p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_PRODUCER, cmdProducer)
296299
if err != nil {
300+
p._getConn().UnregisterListener(p.producerID)
297301
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
298302
if errors.Is(err, internal.ErrRequestTimeOut) {
299303
id := p.client.rpcClient.NewRequestID()
300-
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
304+
_, _ = p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_CLOSE_PRODUCER,
301305
&pb.CommandCloseProducer{
302306
ProducerId: &p.producerID,
303307
RequestId: &id,

pulsar/producer_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"testing"
3030
"time"
3131

32+
"github.com/stretchr/testify/require"
33+
3234
"github.com/stretchr/testify/assert"
3335
"google.golang.org/protobuf/proto"
3436

@@ -2474,3 +2476,33 @@ func TestDisableReplication(t *testing.T) {
24742476
assert.NoError(t, err)
24752477
assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo())
24762478
}
2479+
2480+
func TestProducerWithMaxConnectionsPerBroker(t *testing.T) {
2481+
client, err := NewClient(ClientOptions{
2482+
URL: serviceURL,
2483+
MaxConnectionsPerBroker: 8,
2484+
})
2485+
require.NoError(t, err)
2486+
defer client.Close()
2487+
2488+
for i := 0; i < 10; i++ {
2489+
testProducer, err := client.CreateProducer(ProducerOptions{
2490+
Topic: newTopicName(),
2491+
Schema: NewBytesSchema(nil),
2492+
})
2493+
require.NoError(t, err)
2494+
require.NotNil(t, testProducer)
2495+
2496+
var ok int32
2497+
testProducer.SendAsync(context.Background(), &ProducerMessage{Value: []byte("hello")},
2498+
func(id MessageID, producerMessage *ProducerMessage, err error) {
2499+
if err == nil {
2500+
atomic.StoreInt32(&ok, 1)
2501+
}
2502+
})
2503+
require.Eventually(t, func() bool {
2504+
return atomic.LoadInt32(&ok) == 1
2505+
}, 3*time.Second, time.Millisecond*100)
2506+
testProducer.Close()
2507+
}
2508+
}

0 commit comments

Comments
 (0)