1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
public class HighThroughputProducer { private KafkaProducer<String, String> producer; private ThreadPoolExecutor executor; public HighThroughputProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000); this.producer = new KafkaProducer<>(props); this.executor = new ThreadPoolExecutor( 20, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); }
public void sendAsync(String topic, String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); Future<org.apache.kafka.clients.producer.RecordMetadata> future = producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("发送失败: " + exception.getMessage()); } }); }
public void sendBatch(String topic, java.util.List<String> messages) { for (String message : messages) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); executor.submit(() -> producer.send(record)); } }
public void sendParallel(String topic, java.util.List<String> messages) { messages.parallelStream().forEach(message -> { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); producer.send(record); }); }
public void sendSync(String topic, String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); try { producer.send(record).get(30, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException("发送失败", e); } } public void close() { producer.close(); executor.shutdown(); } }
|